1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23
24use common_base::readable_size::ReadableSize;
25use common_time::Timestamp;
26use serde::{Deserialize, Serialize};
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use store_api::storage::RegionId;
30use uuid::Uuid;
31
32use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
33use crate::sst::location;
34
35pub type Level = u8;
37pub const MAX_LEVEL: Level = 2;
39
40#[derive(Debug, Snafu, PartialEq)]
41pub struct ParseIdError {
42 source: uuid::Error,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
47pub struct FileId(Uuid);
48
49impl FileId {
50 pub fn random() -> FileId {
52 FileId(Uuid::new_v4())
53 }
54
55 pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
57 Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
58 }
59
60 pub fn as_parquet(&self) -> String {
62 format!("{}{}", self, ".parquet")
63 }
64
65 pub fn as_puffin(&self) -> String {
67 format!("{}{}", self, ".puffin")
68 }
69
70 pub fn as_bytes(&self) -> &[u8] {
72 self.0.as_bytes()
73 }
74}
75
76impl From<FileId> for Uuid {
77 fn from(value: FileId) -> Self {
78 value.0
79 }
80}
81
82impl fmt::Display for FileId {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 write!(f, "{}", self.0)
85 }
86}
87
88impl FromStr for FileId {
89 type Err = ParseIdError;
90
91 fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
92 FileId::parse_str(s)
93 }
94}
95
96pub type FileTimeRange = (Timestamp, Timestamp);
99
100pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
102 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
103 let (_, l_end) = l;
104 let (r_start, _) = r;
105
106 r_start <= l_end
107}
108
109#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
111#[serde(default)]
112pub struct FileMeta {
113 pub region_id: RegionId,
115 pub file_id: FileId,
117 pub time_range: FileTimeRange,
120 pub level: Level,
122 pub file_size: u64,
124 pub available_indexes: SmallVec<[IndexType; 4]>,
126 pub index_file_size: u64,
128 pub num_rows: u64,
134 pub num_row_groups: u64,
140 pub sequence: Option<NonZeroU64>,
145}
146
147impl Debug for FileMeta {
148 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
149 let mut debug_struct = f.debug_struct("FileMeta");
150 debug_struct
151 .field("region_id", &self.region_id)
152 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
153 .field_with("time_range", |f| {
154 write!(
155 f,
156 "({}, {}) ",
157 self.time_range.0.to_iso8601_string(),
158 self.time_range.1.to_iso8601_string()
159 )
160 })
161 .field("level", &self.level)
162 .field("file_size", &ReadableSize(self.file_size));
163 if !self.available_indexes.is_empty() {
164 debug_struct
165 .field("available_indexes", &self.available_indexes)
166 .field("index_file_size", &ReadableSize(self.index_file_size));
167 }
168 debug_struct
169 .field("num_rows", &self.num_rows)
170 .field("num_row_groups", &self.num_row_groups)
171 .field_with("sequence", |f| match self.sequence {
172 None => {
173 write!(f, "None")
174 }
175 Some(seq) => {
176 write!(f, "{}", seq)
177 }
178 })
179 .finish()
180 }
181}
182
183#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
185pub enum IndexType {
186 InvertedIndex,
188 FulltextIndex,
190 BloomFilterIndex,
192}
193
194impl FileMeta {
195 pub fn exists_index(&self) -> bool {
196 !self.available_indexes.is_empty()
197 }
198
199 pub fn inverted_index_available(&self) -> bool {
201 self.available_indexes.contains(&IndexType::InvertedIndex)
202 }
203
204 pub fn fulltext_index_available(&self) -> bool {
206 self.available_indexes.contains(&IndexType::FulltextIndex)
207 }
208
209 pub fn bloom_filter_index_available(&self) -> bool {
211 self.available_indexes
212 .contains(&IndexType::BloomFilterIndex)
213 }
214
215 pub fn index_file_size(&self) -> u64 {
216 self.index_file_size
217 }
218}
219
220#[derive(Clone)]
222pub struct FileHandle {
223 inner: Arc<FileHandleInner>,
224}
225
226impl fmt::Debug for FileHandle {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 f.debug_struct("FileHandle")
229 .field("meta", self.meta_ref())
230 .field("compacting", &self.compacting())
231 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
232 .finish()
233 }
234}
235
236impl FileHandle {
237 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
238 FileHandle {
239 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
240 }
241 }
242
243 pub fn region_id(&self) -> RegionId {
245 self.inner.meta.region_id
246 }
247
248 pub fn file_id(&self) -> FileId {
250 self.inner.meta.file_id
251 }
252
253 pub fn file_path(&self, file_dir: &str) -> String {
255 location::sst_file_path(file_dir, self.file_id())
256 }
257
258 pub fn time_range(&self) -> FileTimeRange {
260 self.inner.meta.time_range
261 }
262
263 pub fn mark_deleted(&self) {
265 self.inner.deleted.store(true, Ordering::Relaxed);
266 }
267
268 pub fn compacting(&self) -> bool {
269 self.inner.compacting.load(Ordering::Relaxed)
270 }
271
272 pub fn set_compacting(&self, compacting: bool) {
273 self.inner.compacting.store(compacting, Ordering::Relaxed);
274 }
275
276 pub fn meta_ref(&self) -> &FileMeta {
278 &self.inner.meta
279 }
280
281 pub fn size(&self) -> u64 {
282 self.inner.meta.file_size
283 }
284
285 pub fn index_size(&self) -> u64 {
286 self.inner.meta.index_file_size
287 }
288
289 pub fn num_rows(&self) -> usize {
290 self.inner.meta.num_rows as usize
291 }
292}
293
294struct FileHandleInner {
298 meta: FileMeta,
299 compacting: AtomicBool,
300 deleted: AtomicBool,
301 file_purger: FilePurgerRef,
302}
303
304impl Drop for FileHandleInner {
305 fn drop(&mut self) {
306 if self.deleted.load(Ordering::Relaxed) {
307 self.file_purger.send_request(PurgeRequest {
308 file_meta: self.meta.clone(),
309 });
310 }
311 }
312}
313
314impl FileHandleInner {
315 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
316 FileHandleInner {
317 meta,
318 compacting: AtomicBool::new(false),
319 deleted: AtomicBool::new(false),
320 file_purger,
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[test]
330 fn test_file_id() {
331 let id = FileId::random();
332 let uuid_str = id.to_string();
333 assert_eq!(id.0.to_string(), uuid_str);
334
335 let parsed = FileId::parse_str(&uuid_str).unwrap();
336 assert_eq!(id, parsed);
337 let parsed = uuid_str.parse().unwrap();
338 assert_eq!(id, parsed);
339 }
340
341 #[test]
342 fn test_file_id_serialization() {
343 let id = FileId::random();
344 let json = serde_json::to_string(&id).unwrap();
345 assert_eq!(format!("\"{id}\""), json);
346
347 let parsed = serde_json::from_str(&json).unwrap();
348 assert_eq!(id, parsed);
349 }
350
351 #[test]
352 fn test_file_id_as_parquet() {
353 let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
354 assert_eq!(
355 "67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
356 id.as_parquet()
357 );
358 }
359
360 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
361 FileMeta {
362 region_id: 0.into(),
363 file_id,
364 time_range: FileTimeRange::default(),
365 level,
366 file_size: 0,
367 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
368 index_file_size: 0,
369 num_rows: 0,
370 num_row_groups: 0,
371 sequence: None,
372 }
373 }
374
375 #[test]
376 fn test_deserialize_file_meta() {
377 let file_meta = create_file_meta(FileId::random(), 0);
378 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
379 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
380 assert_eq!(file_meta, deserialized_file_meta.unwrap());
381 }
382
383 #[test]
384 fn test_deserialize_from_string() {
385 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
386 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
387 \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
388 let file_meta = create_file_meta(
389 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
390 0,
391 );
392 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
393 assert_eq!(file_meta, deserialized_file_meta);
394 }
395}