1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23
24use common_base::readable_size::ReadableSize;
25use common_time::Timestamp;
26use serde::{Deserialize, Serialize};
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31use uuid::Uuid;
32
33use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
34use crate::sst::location;
35
36pub type Level = u8;
38pub const MAX_LEVEL: Level = 2;
40
41#[derive(Debug, Snafu, PartialEq)]
42pub struct ParseIdError {
43 source: uuid::Error,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
48pub struct FileId(Uuid);
49
50impl FileId {
51 pub fn random() -> FileId {
53 FileId(Uuid::new_v4())
54 }
55
56 pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
58 Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
59 }
60
61 pub fn as_bytes(&self) -> &[u8] {
63 self.0.as_bytes()
64 }
65}
66
67impl From<FileId> for Uuid {
68 fn from(value: FileId) -> Self {
69 value.0
70 }
71}
72
73impl fmt::Display for FileId {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(f, "{}", self.0)
76 }
77}
78
79impl FromStr for FileId {
80 type Err = ParseIdError;
81
82 fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
83 FileId::parse_str(s)
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub struct RegionFileId {
92 region_id: RegionId,
94 file_id: FileId,
96}
97
98impl RegionFileId {
99 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
101 Self { region_id, file_id }
102 }
103
104 pub fn region_id(&self) -> RegionId {
106 self.region_id
107 }
108
109 pub fn file_id(&self) -> FileId {
111 self.file_id
112 }
113}
114
115impl fmt::Display for RegionFileId {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 write!(f, "{}/{}", self.region_id, self.file_id)
118 }
119}
120
121pub type FileTimeRange = (Timestamp, Timestamp);
124
125pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
127 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
128 let (_, l_end) = l;
129 let (r_start, _) = r;
130
131 r_start <= l_end
132}
133
134#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
136#[serde(default)]
137pub struct FileMeta {
138 pub region_id: RegionId,
140 pub file_id: FileId,
142 pub time_range: FileTimeRange,
145 pub level: Level,
147 pub file_size: u64,
149 pub available_indexes: SmallVec<[IndexType; 4]>,
151 pub index_file_size: u64,
153 pub num_rows: u64,
159 pub num_row_groups: u64,
165 pub sequence: Option<NonZeroU64>,
170}
171
172impl Debug for FileMeta {
173 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
174 let mut debug_struct = f.debug_struct("FileMeta");
175 debug_struct
176 .field("region_id", &self.region_id)
177 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
178 .field_with("time_range", |f| {
179 write!(
180 f,
181 "({}, {}) ",
182 self.time_range.0.to_iso8601_string(),
183 self.time_range.1.to_iso8601_string()
184 )
185 })
186 .field("level", &self.level)
187 .field("file_size", &ReadableSize(self.file_size));
188 if !self.available_indexes.is_empty() {
189 debug_struct
190 .field("available_indexes", &self.available_indexes)
191 .field("index_file_size", &ReadableSize(self.index_file_size));
192 }
193 debug_struct
194 .field("num_rows", &self.num_rows)
195 .field("num_row_groups", &self.num_row_groups)
196 .field_with("sequence", |f| match self.sequence {
197 None => {
198 write!(f, "None")
199 }
200 Some(seq) => {
201 write!(f, "{}", seq)
202 }
203 })
204 .finish()
205 }
206}
207
208#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
210pub enum IndexType {
211 InvertedIndex,
213 FulltextIndex,
215 BloomFilterIndex,
217}
218
219impl FileMeta {
220 pub fn exists_index(&self) -> bool {
221 !self.available_indexes.is_empty()
222 }
223
224 pub fn inverted_index_available(&self) -> bool {
226 self.available_indexes.contains(&IndexType::InvertedIndex)
227 }
228
229 pub fn fulltext_index_available(&self) -> bool {
231 self.available_indexes.contains(&IndexType::FulltextIndex)
232 }
233
234 pub fn bloom_filter_index_available(&self) -> bool {
236 self.available_indexes
237 .contains(&IndexType::BloomFilterIndex)
238 }
239
240 pub fn index_file_size(&self) -> u64 {
241 self.index_file_size
242 }
243
244 pub fn file_id(&self) -> RegionFileId {
246 RegionFileId::new(self.region_id, self.file_id)
247 }
248}
249
250#[derive(Clone)]
252pub struct FileHandle {
253 inner: Arc<FileHandleInner>,
254}
255
256impl fmt::Debug for FileHandle {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("FileHandle")
259 .field("meta", self.meta_ref())
260 .field("compacting", &self.compacting())
261 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
262 .finish()
263 }
264}
265
266impl FileHandle {
267 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
268 FileHandle {
269 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
270 }
271 }
272
273 pub fn region_id(&self) -> RegionId {
275 self.inner.meta.region_id
276 }
277
278 pub fn file_id(&self) -> RegionFileId {
280 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
281 }
282
283 pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String {
285 location::sst_file_path(file_dir, self.file_id(), path_type)
286 }
287
288 pub fn time_range(&self) -> FileTimeRange {
290 self.inner.meta.time_range
291 }
292
293 pub fn mark_deleted(&self) {
295 self.inner.deleted.store(true, Ordering::Relaxed);
296 }
297
298 pub fn compacting(&self) -> bool {
299 self.inner.compacting.load(Ordering::Relaxed)
300 }
301
302 pub fn set_compacting(&self, compacting: bool) {
303 self.inner.compacting.store(compacting, Ordering::Relaxed);
304 }
305
306 pub fn meta_ref(&self) -> &FileMeta {
308 &self.inner.meta
309 }
310
311 pub fn size(&self) -> u64 {
312 self.inner.meta.file_size
313 }
314
315 pub fn index_size(&self) -> u64 {
316 self.inner.meta.index_file_size
317 }
318
319 pub fn num_rows(&self) -> usize {
320 self.inner.meta.num_rows as usize
321 }
322}
323
324struct FileHandleInner {
328 meta: FileMeta,
329 compacting: AtomicBool,
330 deleted: AtomicBool,
331 file_purger: FilePurgerRef,
332}
333
334impl Drop for FileHandleInner {
335 fn drop(&mut self) {
336 if self.deleted.load(Ordering::Relaxed) {
337 self.file_purger.send_request(PurgeRequest {
338 file_meta: self.meta.clone(),
339 });
340 }
341 }
342}
343
344impl FileHandleInner {
345 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
346 FileHandleInner {
347 meta,
348 compacting: AtomicBool::new(false),
349 deleted: AtomicBool::new(false),
350 file_purger,
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_file_id() {
361 let id = FileId::random();
362 let uuid_str = id.to_string();
363 assert_eq!(id.0.to_string(), uuid_str);
364
365 let parsed = FileId::parse_str(&uuid_str).unwrap();
366 assert_eq!(id, parsed);
367 let parsed = uuid_str.parse().unwrap();
368 assert_eq!(id, parsed);
369 }
370
371 #[test]
372 fn test_file_id_serialization() {
373 let id = FileId::random();
374 let json = serde_json::to_string(&id).unwrap();
375 assert_eq!(format!("\"{id}\""), json);
376
377 let parsed = serde_json::from_str(&json).unwrap();
378 assert_eq!(id, parsed);
379 }
380
381 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
382 FileMeta {
383 region_id: 0.into(),
384 file_id,
385 time_range: FileTimeRange::default(),
386 level,
387 file_size: 0,
388 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
389 index_file_size: 0,
390 num_rows: 0,
391 num_row_groups: 0,
392 sequence: None,
393 }
394 }
395
396 #[test]
397 fn test_deserialize_file_meta() {
398 let file_meta = create_file_meta(FileId::random(), 0);
399 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
400 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
401 assert_eq!(file_meta, deserialized_file_meta.unwrap());
402 }
403
404 #[test]
405 fn test_deserialize_from_string() {
406 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
407 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
408 \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
409 let file_meta = create_file_meta(
410 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
411 0,
412 );
413 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
414 assert_eq!(file_meta, deserialized_file_meta);
415 }
416}