1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::region_request::PathType;
30use store_api::storage::{FileId, RegionId};
31
32use crate::access_layer::AccessLayerRef;
33use crate::cache::CacheManagerRef;
34use crate::cache::file_cache::{FileType, IndexKey};
35use crate::sst::file_purger::FilePurgerRef;
36use crate::sst::location;
37
38fn serialize_partition_expr<S>(
40 partition_expr: &Option<PartitionExpr>,
41 serializer: S,
42) -> Result<S::Ok, S::Error>
43where
44 S: serde::Serializer,
45{
46 use serde::ser::Error;
47
48 match partition_expr {
49 None => serializer.serialize_none(),
50 Some(expr) => {
51 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
52 serializer.serialize_some(&json_str)
53 }
54 }
55}
56
57fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
58where
59 D: serde::Deserializer<'de>,
60{
61 use serde::de::Error;
62
63 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
64 match opt_json_str {
65 None => Ok(None),
66 Some(json_str) => {
67 if json_str.is_empty() {
68 Ok(None)
70 } else {
71 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
73 }
74 }
75 }
76}
77
78pub type Level = u8;
80pub const MAX_LEVEL: Level = 2;
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub struct RegionFileId {
88 region_id: RegionId,
90 file_id: FileId,
92}
93
94impl RegionFileId {
95 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
97 Self { region_id, file_id }
98 }
99
100 pub fn region_id(&self) -> RegionId {
102 self.region_id
103 }
104
105 pub fn file_id(&self) -> FileId {
107 self.file_id
108 }
109}
110
111impl fmt::Display for RegionFileId {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 write!(f, "{}/{}", self.region_id, self.file_id)
114 }
115}
116
117pub type FileTimeRange = (Timestamp, Timestamp);
120
121pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
123 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
124 let (_, l_end) = l;
125 let (r_start, _) = r;
126
127 r_start <= l_end
128}
129
130#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
132#[serde(default)]
133pub struct FileMeta {
134 pub region_id: RegionId,
136 pub file_id: FileId,
138 pub time_range: FileTimeRange,
141 pub level: Level,
143 pub file_size: u64,
145 pub available_indexes: SmallVec<[IndexType; 4]>,
147 pub index_file_size: u64,
149 pub index_file_id: Option<FileId>,
155 pub num_rows: u64,
161 pub num_row_groups: u64,
167 pub sequence: Option<NonZeroU64>,
172 #[serde(
180 serialize_with = "serialize_partition_expr",
181 deserialize_with = "deserialize_partition_expr"
182 )]
183 pub partition_expr: Option<PartitionExpr>,
184 pub num_series: u64,
188}
189
190impl Debug for FileMeta {
191 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
192 let mut debug_struct = f.debug_struct("FileMeta");
193 debug_struct
194 .field("region_id", &self.region_id)
195 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
196 .field_with("time_range", |f| {
197 write!(
198 f,
199 "({}, {}) ",
200 self.time_range.0.to_iso8601_string(),
201 self.time_range.1.to_iso8601_string()
202 )
203 })
204 .field("level", &self.level)
205 .field("file_size", &ReadableSize(self.file_size));
206 if !self.available_indexes.is_empty() {
207 debug_struct
208 .field("available_indexes", &self.available_indexes)
209 .field("index_file_size", &ReadableSize(self.index_file_size));
210 }
211 debug_struct
212 .field("num_rows", &self.num_rows)
213 .field("num_row_groups", &self.num_row_groups)
214 .field_with("sequence", |f| match self.sequence {
215 None => {
216 write!(f, "None")
217 }
218 Some(seq) => {
219 write!(f, "{}", seq)
220 }
221 })
222 .field("partition_expr", &self.partition_expr)
223 .field("num_series", &self.num_series)
224 .finish()
225 }
226}
227
228#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
230pub enum IndexType {
231 InvertedIndex,
233 FulltextIndex,
235 BloomFilterIndex,
237}
238
239impl FileMeta {
240 pub fn exists_index(&self) -> bool {
241 !self.available_indexes.is_empty()
242 }
243
244 pub fn inverted_index_available(&self) -> bool {
246 self.available_indexes.contains(&IndexType::InvertedIndex)
247 }
248
249 pub fn fulltext_index_available(&self) -> bool {
251 self.available_indexes.contains(&IndexType::FulltextIndex)
252 }
253
254 pub fn bloom_filter_index_available(&self) -> bool {
256 self.available_indexes
257 .contains(&IndexType::BloomFilterIndex)
258 }
259
260 pub fn index_file_size(&self) -> u64 {
261 self.index_file_size
262 }
263
264 pub fn file_id(&self) -> RegionFileId {
266 RegionFileId::new(self.region_id, self.file_id)
267 }
268
269 pub fn index_file_id(&self) -> RegionFileId {
272 if let Some(index_file_id) = self.index_file_id {
273 RegionFileId::new(self.region_id, index_file_id)
274 } else {
275 self.file_id()
276 }
277 }
278}
279
280#[derive(Clone)]
282pub struct FileHandle {
283 inner: Arc<FileHandleInner>,
284}
285
286impl fmt::Debug for FileHandle {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 f.debug_struct("FileHandle")
289 .field("meta", self.meta_ref())
290 .field("compacting", &self.compacting())
291 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
292 .finish()
293 }
294}
295
296impl FileHandle {
297 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
298 FileHandle {
299 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
300 }
301 }
302
303 pub fn region_id(&self) -> RegionId {
305 self.inner.meta.region_id
306 }
307
308 pub fn file_id(&self) -> RegionFileId {
310 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
311 }
312
313 pub fn index_file_id(&self) -> RegionFileId {
316 if let Some(index_file_id) = self.inner.meta.index_file_id {
317 RegionFileId::new(self.inner.meta.region_id, index_file_id)
318 } else {
319 self.file_id()
320 }
321 }
322
323 pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
325 location::sst_file_path(table_dir, self.file_id(), path_type)
326 }
327
328 pub fn time_range(&self) -> FileTimeRange {
330 self.inner.meta.time_range
331 }
332
333 pub fn mark_deleted(&self) {
335 self.inner.deleted.store(true, Ordering::Relaxed);
336 }
337
338 pub fn compacting(&self) -> bool {
339 self.inner.compacting.load(Ordering::Relaxed)
340 }
341
342 pub fn set_compacting(&self, compacting: bool) {
343 self.inner.compacting.store(compacting, Ordering::Relaxed);
344 }
345
346 pub fn meta_ref(&self) -> &FileMeta {
348 &self.inner.meta
349 }
350
351 pub fn file_purger(&self) -> FilePurgerRef {
352 self.inner.file_purger.clone()
353 }
354
355 pub fn size(&self) -> u64 {
356 self.inner.meta.file_size
357 }
358
359 pub fn index_size(&self) -> u64 {
360 self.inner.meta.index_file_size
361 }
362
363 pub fn num_rows(&self) -> usize {
364 self.inner.meta.num_rows as usize
365 }
366
367 pub fn level(&self) -> Level {
368 self.inner.meta.level
369 }
370
371 pub fn is_deleted(&self) -> bool {
372 self.inner.deleted.load(Ordering::Relaxed)
373 }
374}
375
376struct FileHandleInner {
380 meta: FileMeta,
381 compacting: AtomicBool,
382 deleted: AtomicBool,
383 file_purger: FilePurgerRef,
384}
385
386impl Drop for FileHandleInner {
387 fn drop(&mut self) {
388 self.file_purger
389 .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
390 }
391}
392
393impl FileHandleInner {
394 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
395 file_purger.new_file(&meta);
396 FileHandleInner {
397 meta,
398 compacting: AtomicBool::new(false),
399 deleted: AtomicBool::new(false),
400 file_purger,
401 }
402 }
403}
404
405pub async fn delete_files(
407 region_id: RegionId,
408 file_ids: &[(FileId, FileId)],
409 delete_index: bool,
410 access_layer: &AccessLayerRef,
411 cache_manager: &Option<CacheManagerRef>,
412) -> crate::error::Result<()> {
413 if let Some(cache) = &cache_manager {
415 for (file_id, _) in file_ids {
416 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
417 }
418 }
419 let mut deleted_files = Vec::with_capacity(file_ids.len());
420
421 for (file_id, index_file_id) in file_ids {
422 let region_file_id = RegionFileId::new(region_id, *file_id);
423 match access_layer
424 .delete_sst(
425 &RegionFileId::new(region_id, *file_id),
426 &RegionFileId::new(region_id, *index_file_id),
427 )
428 .await
429 {
430 Ok(_) => {
431 deleted_files.push(*file_id);
432 }
433 Err(e) => {
434 error!(e; "Failed to delete sst and index file for {}", region_file_id);
435 }
436 }
437 }
438
439 info!(
440 "Deleted {} files for region {}: {:?}",
441 deleted_files.len(),
442 region_id,
443 deleted_files
444 );
445
446 for (file_id, index_file_id) in file_ids {
447 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
448 if delete_index {
450 write_cache
451 .remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
452 .await;
453 }
454
455 write_cache
457 .remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
458 .await;
459 }
460
461 if let Err(e) = access_layer
463 .puffin_manager_factory()
464 .purge_stager(RegionFileId::new(region_id, *index_file_id))
465 .await
466 {
467 error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
468 index_file_id, region_id);
469 }
470 }
471 Ok(())
472}
473
474#[cfg(test)]
475mod tests {
476 use std::str::FromStr;
477
478 use datatypes::value::Value;
479 use partition::expr::{PartitionExpr, col};
480
481 use super::*;
482
483 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
484 FileMeta {
485 region_id: 0.into(),
486 file_id,
487 time_range: FileTimeRange::default(),
488 level,
489 file_size: 0,
490 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
491 index_file_size: 0,
492 index_file_id: None,
493 num_rows: 0,
494 num_row_groups: 0,
495 sequence: None,
496 partition_expr: None,
497 num_series: 0,
498 }
499 }
500
501 #[test]
502 fn test_deserialize_file_meta() {
503 let file_meta = create_file_meta(FileId::random(), 0);
504 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
505 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
506 assert_eq!(file_meta, deserialized_file_meta.unwrap());
507 }
508
509 #[test]
510 fn test_deserialize_from_string() {
511 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
512 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
513 \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
514 let file_meta = create_file_meta(
515 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
516 0,
517 );
518 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
519 assert_eq!(file_meta, deserialized_file_meta);
520 }
521
522 #[test]
523 fn test_file_meta_with_partition_expr() {
524 let file_id = FileId::random();
525 let partition_expr = PartitionExpr::new(
526 col("a"),
527 partition::expr::RestrictedOp::GtEq,
528 Value::UInt32(10).into(),
529 );
530
531 let file_meta_with_partition = FileMeta {
532 region_id: 0.into(),
533 file_id,
534 time_range: FileTimeRange::default(),
535 level: 0,
536 file_size: 0,
537 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
538 index_file_size: 0,
539 index_file_id: None,
540 num_rows: 0,
541 num_row_groups: 0,
542 sequence: None,
543 partition_expr: Some(partition_expr.clone()),
544 num_series: 0,
545 };
546
547 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
549 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
550 assert_eq!(file_meta_with_partition, deserialized);
551
552 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
554 assert!(serialized_value["partition_expr"].as_str().is_some());
555 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
556 assert!(partition_expr_json.contains("\"Column\":\"a\""));
557 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
558
559 let file_meta_none = FileMeta {
561 partition_expr: None,
562 ..file_meta_with_partition.clone()
563 };
564 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
565 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
566 assert_eq!(file_meta_none, deserialized_none);
567 }
568
569 #[test]
570 fn test_file_meta_partition_expr_backward_compatibility() {
571 let json_with_partition_expr = r#"{
573 "region_id": 0,
574 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
575 "time_range": [
576 {"value": 0, "unit": "Millisecond"},
577 {"value": 0, "unit": "Millisecond"}
578 ],
579 "level": 0,
580 "file_size": 0,
581 "available_indexes": ["InvertedIndex"],
582 "index_file_size": 0,
583 "num_rows": 0,
584 "num_row_groups": 0,
585 "sequence": null,
586 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
587 }"#;
588
589 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
590 assert!(file_meta.partition_expr.is_some());
591 let expr = file_meta.partition_expr.unwrap();
592 assert_eq!(format!("{}", expr), "a >= 10");
593
594 let json_with_empty_expr = r#"{
596 "region_id": 0,
597 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
598 "time_range": [
599 {"value": 0, "unit": "Millisecond"},
600 {"value": 0, "unit": "Millisecond"}
601 ],
602 "level": 0,
603 "file_size": 0,
604 "available_indexes": [],
605 "index_file_size": 0,
606 "num_rows": 0,
607 "num_row_groups": 0,
608 "sequence": null,
609 "partition_expr": ""
610 }"#;
611
612 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
613 assert!(file_meta_empty.partition_expr.is_none());
614
615 let json_with_null_expr = r#"{
617 "region_id": 0,
618 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
619 "time_range": [
620 {"value": 0, "unit": "Millisecond"},
621 {"value": 0, "unit": "Millisecond"}
622 ],
623 "level": 0,
624 "file_size": 0,
625 "available_indexes": [],
626 "index_file_size": 0,
627 "num_rows": 0,
628 "num_row_groups": 0,
629 "sequence": null,
630 "partition_expr": null
631 }"#;
632
633 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
634 assert!(file_meta_null.partition_expr.is_none());
635
636 let json_with_empty_expr = r#"{
638 "region_id": 0,
639 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
640 "time_range": [
641 {"value": 0, "unit": "Millisecond"},
642 {"value": 0, "unit": "Millisecond"}
643 ],
644 "level": 0,
645 "file_size": 0,
646 "available_indexes": [],
647 "index_file_size": 0,
648 "num_rows": 0,
649 "num_row_groups": 0,
650 "sequence": null
651 }"#;
652
653 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
654 assert!(file_meta_empty.partition_expr.is_none());
655 }
656}