1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::region_request::PathType;
30use store_api::storage::{FileId, RegionId};
31
32use crate::access_layer::AccessLayerRef;
33use crate::cache::CacheManagerRef;
34use crate::cache::file_cache::{FileType, IndexKey};
35use crate::sst::file_purger::FilePurgerRef;
36use crate::sst::location;
37
38fn serialize_partition_expr<S>(
40 partition_expr: &Option<PartitionExpr>,
41 serializer: S,
42) -> Result<S::Ok, S::Error>
43where
44 S: serde::Serializer,
45{
46 use serde::ser::Error;
47
48 match partition_expr {
49 None => serializer.serialize_none(),
50 Some(expr) => {
51 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
52 serializer.serialize_some(&json_str)
53 }
54 }
55}
56
57fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
58where
59 D: serde::Deserializer<'de>,
60{
61 use serde::de::Error;
62
63 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
64 match opt_json_str {
65 None => Ok(None),
66 Some(json_str) => {
67 if json_str.is_empty() {
68 Ok(None)
70 } else {
71 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
73 }
74 }
75 }
76}
77
78pub type Level = u8;
80pub const MAX_LEVEL: Level = 2;
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub struct RegionFileId {
88 region_id: RegionId,
90 file_id: FileId,
92}
93
94impl RegionFileId {
95 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
97 Self { region_id, file_id }
98 }
99
100 pub fn region_id(&self) -> RegionId {
102 self.region_id
103 }
104
105 pub fn file_id(&self) -> FileId {
107 self.file_id
108 }
109}
110
111impl fmt::Display for RegionFileId {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 write!(f, "{}/{}", self.region_id, self.file_id)
114 }
115}
116
117pub type FileTimeRange = (Timestamp, Timestamp);
120
121pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
123 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
124 let (_, l_end) = l;
125 let (r_start, _) = r;
126
127 r_start <= l_end
128}
129
130#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
132#[serde(default)]
133pub struct FileMeta {
134 pub region_id: RegionId,
136 pub file_id: FileId,
138 pub time_range: FileTimeRange,
141 pub level: Level,
143 pub file_size: u64,
145 pub available_indexes: SmallVec<[IndexType; 4]>,
147 pub index_file_size: u64,
149 pub num_rows: u64,
155 pub num_row_groups: u64,
161 pub sequence: Option<NonZeroU64>,
166 #[serde(
174 serialize_with = "serialize_partition_expr",
175 deserialize_with = "deserialize_partition_expr"
176 )]
177 pub partition_expr: Option<PartitionExpr>,
178}
179
180impl Debug for FileMeta {
181 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
182 let mut debug_struct = f.debug_struct("FileMeta");
183 debug_struct
184 .field("region_id", &self.region_id)
185 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
186 .field_with("time_range", |f| {
187 write!(
188 f,
189 "({}, {}) ",
190 self.time_range.0.to_iso8601_string(),
191 self.time_range.1.to_iso8601_string()
192 )
193 })
194 .field("level", &self.level)
195 .field("file_size", &ReadableSize(self.file_size));
196 if !self.available_indexes.is_empty() {
197 debug_struct
198 .field("available_indexes", &self.available_indexes)
199 .field("index_file_size", &ReadableSize(self.index_file_size));
200 }
201 debug_struct
202 .field("num_rows", &self.num_rows)
203 .field("num_row_groups", &self.num_row_groups)
204 .field_with("sequence", |f| match self.sequence {
205 None => {
206 write!(f, "None")
207 }
208 Some(seq) => {
209 write!(f, "{}", seq)
210 }
211 })
212 .field("partition_expr", &self.partition_expr)
213 .finish()
214 }
215}
216
217#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
219pub enum IndexType {
220 InvertedIndex,
222 FulltextIndex,
224 BloomFilterIndex,
226}
227
228impl FileMeta {
229 pub fn exists_index(&self) -> bool {
230 !self.available_indexes.is_empty()
231 }
232
233 pub fn inverted_index_available(&self) -> bool {
235 self.available_indexes.contains(&IndexType::InvertedIndex)
236 }
237
238 pub fn fulltext_index_available(&self) -> bool {
240 self.available_indexes.contains(&IndexType::FulltextIndex)
241 }
242
243 pub fn bloom_filter_index_available(&self) -> bool {
245 self.available_indexes
246 .contains(&IndexType::BloomFilterIndex)
247 }
248
249 pub fn index_file_size(&self) -> u64 {
250 self.index_file_size
251 }
252
253 pub fn file_id(&self) -> RegionFileId {
255 RegionFileId::new(self.region_id, self.file_id)
256 }
257}
258
259#[derive(Clone)]
261pub struct FileHandle {
262 inner: Arc<FileHandleInner>,
263}
264
265impl fmt::Debug for FileHandle {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 f.debug_struct("FileHandle")
268 .field("meta", self.meta_ref())
269 .field("compacting", &self.compacting())
270 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
271 .finish()
272 }
273}
274
275impl FileHandle {
276 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
277 FileHandle {
278 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
279 }
280 }
281
282 pub fn region_id(&self) -> RegionId {
284 self.inner.meta.region_id
285 }
286
287 pub fn file_id(&self) -> RegionFileId {
289 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
290 }
291
292 pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String {
294 location::sst_file_path(file_dir, self.file_id(), path_type)
295 }
296
297 pub fn time_range(&self) -> FileTimeRange {
299 self.inner.meta.time_range
300 }
301
302 pub fn mark_deleted(&self) {
304 self.inner.deleted.store(true, Ordering::Relaxed);
305 }
306
307 pub fn compacting(&self) -> bool {
308 self.inner.compacting.load(Ordering::Relaxed)
309 }
310
311 pub fn set_compacting(&self, compacting: bool) {
312 self.inner.compacting.store(compacting, Ordering::Relaxed);
313 }
314
315 pub fn meta_ref(&self) -> &FileMeta {
317 &self.inner.meta
318 }
319
320 pub fn file_purger(&self) -> FilePurgerRef {
321 self.inner.file_purger.clone()
322 }
323
324 pub fn size(&self) -> u64 {
325 self.inner.meta.file_size
326 }
327
328 pub fn index_size(&self) -> u64 {
329 self.inner.meta.index_file_size
330 }
331
332 pub fn num_rows(&self) -> usize {
333 self.inner.meta.num_rows as usize
334 }
335
336 pub fn level(&self) -> Level {
337 self.inner.meta.level
338 }
339
340 pub fn is_deleted(&self) -> bool {
341 self.inner.deleted.load(Ordering::Relaxed)
342 }
343}
344
345struct FileHandleInner {
349 meta: FileMeta,
350 compacting: AtomicBool,
351 deleted: AtomicBool,
352 file_purger: FilePurgerRef,
353}
354
355impl Drop for FileHandleInner {
356 fn drop(&mut self) {
357 self.file_purger
358 .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
359 }
360}
361
362impl FileHandleInner {
363 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
364 file_purger.new_file(&meta);
365 FileHandleInner {
366 meta,
367 compacting: AtomicBool::new(false),
368 deleted: AtomicBool::new(false),
369 file_purger,
370 }
371 }
372}
373
374pub async fn delete_files(
376 region_id: RegionId,
377 file_ids: &[FileId],
378 delete_index: bool,
379 access_layer: &AccessLayerRef,
380 cache_manager: &Option<CacheManagerRef>,
381) -> crate::error::Result<()> {
382 if let Some(cache) = &cache_manager {
384 for file_id in file_ids {
385 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
386 }
387 }
388 let mut deleted_files = Vec::with_capacity(file_ids.len());
389
390 for file_id in file_ids {
391 let region_file_id = RegionFileId::new(region_id, *file_id);
392 match access_layer.delete_sst(®ion_file_id).await {
393 Ok(_) => {
394 deleted_files.push(*file_id);
395 }
396 Err(e) => {
397 error!(e; "Failed to delete sst and index file for {}", region_file_id);
398 }
399 }
400 }
401
402 info!(
403 "Deleted {} files for region {}: {:?}",
404 deleted_files.len(),
405 region_id,
406 deleted_files
407 );
408
409 for file_id in file_ids {
410 let region_file_id = RegionFileId::new(region_id, *file_id);
411
412 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
413 if delete_index {
415 write_cache
416 .remove(IndexKey::new(region_id, *file_id, FileType::Puffin))
417 .await;
418 }
419
420 write_cache
422 .remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
423 .await;
424 }
425
426 if let Err(e) = access_layer
428 .puffin_manager_factory()
429 .purge_stager(region_file_id)
430 .await
431 {
432 error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
433 file_id, region_id);
434 }
435 }
436 Ok(())
437}
438
439#[cfg(test)]
440mod tests {
441 use std::str::FromStr;
442
443 use datatypes::value::Value;
444 use partition::expr::{PartitionExpr, col};
445
446 use super::*;
447
448 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
449 FileMeta {
450 region_id: 0.into(),
451 file_id,
452 time_range: FileTimeRange::default(),
453 level,
454 file_size: 0,
455 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
456 index_file_size: 0,
457 num_rows: 0,
458 num_row_groups: 0,
459 sequence: None,
460 partition_expr: None,
461 }
462 }
463
464 #[test]
465 fn test_deserialize_file_meta() {
466 let file_meta = create_file_meta(FileId::random(), 0);
467 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
468 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
469 assert_eq!(file_meta, deserialized_file_meta.unwrap());
470 }
471
472 #[test]
473 fn test_deserialize_from_string() {
474 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
475 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
476 \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
477 let file_meta = create_file_meta(
478 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
479 0,
480 );
481 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
482 assert_eq!(file_meta, deserialized_file_meta);
483 }
484
485 #[test]
486 fn test_file_meta_with_partition_expr() {
487 let file_id = FileId::random();
488 let partition_expr = PartitionExpr::new(
489 col("a"),
490 partition::expr::RestrictedOp::GtEq,
491 Value::UInt32(10).into(),
492 );
493
494 let file_meta_with_partition = FileMeta {
495 region_id: 0.into(),
496 file_id,
497 time_range: FileTimeRange::default(),
498 level: 0,
499 file_size: 0,
500 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
501 index_file_size: 0,
502 num_rows: 0,
503 num_row_groups: 0,
504 sequence: None,
505 partition_expr: Some(partition_expr.clone()),
506 };
507
508 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
510 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
511 assert_eq!(file_meta_with_partition, deserialized);
512
513 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
515 assert!(serialized_value["partition_expr"].as_str().is_some());
516 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
517 assert!(partition_expr_json.contains("\"Column\":\"a\""));
518 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
519
520 let file_meta_none = FileMeta {
522 partition_expr: None,
523 ..file_meta_with_partition.clone()
524 };
525 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
526 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
527 assert_eq!(file_meta_none, deserialized_none);
528 }
529
530 #[test]
531 fn test_file_meta_partition_expr_backward_compatibility() {
532 let json_with_partition_expr = r#"{
534 "region_id": 0,
535 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
536 "time_range": [
537 {"value": 0, "unit": "Millisecond"},
538 {"value": 0, "unit": "Millisecond"}
539 ],
540 "level": 0,
541 "file_size": 0,
542 "available_indexes": ["InvertedIndex"],
543 "index_file_size": 0,
544 "num_rows": 0,
545 "num_row_groups": 0,
546 "sequence": null,
547 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
548 }"#;
549
550 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
551 assert!(file_meta.partition_expr.is_some());
552 let expr = file_meta.partition_expr.unwrap();
553 assert_eq!(format!("{}", expr), "a >= 10");
554
555 let json_with_empty_expr = r#"{
557 "region_id": 0,
558 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
559 "time_range": [
560 {"value": 0, "unit": "Millisecond"},
561 {"value": 0, "unit": "Millisecond"}
562 ],
563 "level": 0,
564 "file_size": 0,
565 "available_indexes": [],
566 "index_file_size": 0,
567 "num_rows": 0,
568 "num_row_groups": 0,
569 "sequence": null,
570 "partition_expr": ""
571 }"#;
572
573 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
574 assert!(file_meta_empty.partition_expr.is_none());
575
576 let json_with_null_expr = r#"{
578 "region_id": 0,
579 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
580 "time_range": [
581 {"value": 0, "unit": "Millisecond"},
582 {"value": 0, "unit": "Millisecond"}
583 ],
584 "level": 0,
585 "file_size": 0,
586 "available_indexes": [],
587 "index_file_size": 0,
588 "num_rows": 0,
589 "num_row_groups": 0,
590 "sequence": null,
591 "partition_expr": null
592 }"#;
593
594 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
595 assert!(file_meta_null.partition_expr.is_none());
596
597 let json_with_empty_expr = r#"{
599 "region_id": 0,
600 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
601 "time_range": [
602 {"value": 0, "unit": "Millisecond"},
603 {"value": 0, "unit": "Millisecond"}
604 ],
605 "level": 0,
606 "file_size": 0,
607 "available_indexes": [],
608 "index_file_size": 0,
609 "num_rows": 0,
610 "num_row_groups": 0,
611 "sequence": null
612 }"#;
613
614 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
615 assert!(file_meta_empty.partition_expr.is_none());
616 }
617}