1use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use common_time::Timestamp;
19use common_time::timestamp::TimeUnit;
20use datafusion_common::DataFusionError;
21use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
22use datatypes::arrow::array::{
23 ArrayRef, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
24 UInt32Array, UInt64Array,
25};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow_array::StringArray;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use serde::{Deserialize, Serialize};
30
31use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct ManifestSstEntry {
36 pub table_dir: String,
38 pub region_id: RegionId,
40 pub table_id: TableId,
42 pub region_number: RegionNumber,
44 pub region_group: RegionGroup,
46 pub region_sequence: RegionSeq,
48 pub file_id: String,
50 pub level: u8,
52 pub file_path: String,
54 pub file_size: u64,
56 pub index_file_path: Option<String>,
58 pub index_file_size: Option<u64>,
60 pub num_rows: u64,
62 pub num_row_groups: u64,
64 pub min_ts: Timestamp,
66 pub max_ts: Timestamp,
68 pub sequence: Option<u64>,
70 pub origin_region_id: RegionId,
72 pub node_id: Option<u64>,
74 pub visible: bool,
76}
77
78impl ManifestSstEntry {
79 pub fn schema() -> SchemaRef {
81 use datatypes::prelude::ConcreteDataType as Ty;
82 Arc::new(Schema::new(vec![
83 ColumnSchema::new("table_dir", Ty::string_datatype(), false),
84 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
85 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
86 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
87 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
88 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
89 ColumnSchema::new("file_id", Ty::string_datatype(), false),
90 ColumnSchema::new("level", Ty::uint8_datatype(), false),
91 ColumnSchema::new("file_path", Ty::string_datatype(), false),
92 ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
93 ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
94 ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
95 ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
96 ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
97 ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
98 ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
99 ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
100 ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
101 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
102 ColumnSchema::new("visible", Ty::boolean_datatype(), false),
103 ]))
104 }
105
106 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
108 let schema = Self::schema();
109 let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
110 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
111 let table_ids = entries.iter().map(|e| e.table_id);
112 let region_numbers = entries.iter().map(|e| e.region_number);
113 let region_groups = entries.iter().map(|e| e.region_group);
114 let region_sequences = entries.iter().map(|e| e.region_sequence);
115 let file_ids = entries.iter().map(|e| e.file_id.as_str());
116 let levels = entries.iter().map(|e| e.level);
117 let file_paths = entries.iter().map(|e| e.file_path.as_str());
118 let file_sizes = entries.iter().map(|e| e.file_size);
119 let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
120 let index_file_sizes = entries.iter().map(|e| e.index_file_size);
121 let num_rows = entries.iter().map(|e| e.num_rows);
122 let num_row_groups = entries.iter().map(|e| e.num_row_groups);
123 let min_ts = entries.iter().map(|e| {
124 e.min_ts
125 .convert_to(TimeUnit::Nanosecond)
126 .map(|ts| ts.value())
127 });
128 let max_ts = entries.iter().map(|e| {
129 e.max_ts
130 .convert_to(TimeUnit::Nanosecond)
131 .map(|ts| ts.value())
132 });
133 let sequences = entries.iter().map(|e| e.sequence);
134 let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
135 let node_ids = entries.iter().map(|e| e.node_id);
136 let visible_flags = entries.iter().map(|e| Some(e.visible));
137
138 let columns: Vec<ArrayRef> = vec![
139 Arc::new(StringArray::from_iter_values(table_dirs)),
140 Arc::new(UInt64Array::from_iter_values(region_ids)),
141 Arc::new(UInt32Array::from_iter_values(table_ids)),
142 Arc::new(UInt32Array::from_iter_values(region_numbers)),
143 Arc::new(UInt8Array::from_iter_values(region_groups)),
144 Arc::new(UInt32Array::from_iter_values(region_sequences)),
145 Arc::new(StringArray::from_iter_values(file_ids)),
146 Arc::new(UInt8Array::from_iter_values(levels)),
147 Arc::new(StringArray::from_iter_values(file_paths)),
148 Arc::new(UInt64Array::from_iter_values(file_sizes)),
149 Arc::new(StringArray::from_iter(index_file_paths)),
150 Arc::new(UInt64Array::from_iter(index_file_sizes)),
151 Arc::new(UInt64Array::from_iter_values(num_rows)),
152 Arc::new(UInt64Array::from_iter_values(num_row_groups)),
153 Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
154 Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
155 Arc::new(UInt64Array::from_iter(sequences)),
156 Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
157 Arc::new(UInt64Array::from_iter(node_ids)),
158 Arc::new(BooleanArray::from_iter(visible_flags)),
159 ];
160
161 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
162 }
163
164 pub fn reserved_table_name_for_inspection() -> &'static str {
170 "__inspect/__mito/__sst_manifest"
171 }
172
173 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
175 build_plan_helper(
176 scan_request,
177 Self::reserved_table_name_for_inspection(),
178 Self::schema(),
179 )
180 }
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
185pub struct StorageSstEntry {
186 pub file_path: String,
188 pub file_size: Option<u64>,
190 pub last_modified_ms: Option<Timestamp>,
192 pub node_id: Option<u64>,
194}
195
196impl StorageSstEntry {
197 pub fn schema() -> SchemaRef {
199 use datatypes::prelude::ConcreteDataType as Ty;
200 Arc::new(Schema::new(vec![
201 ColumnSchema::new("file_path", Ty::string_datatype(), false),
202 ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
203 ColumnSchema::new(
204 "last_modified_ms",
205 Ty::timestamp_millisecond_datatype(),
206 true,
207 ),
208 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
209 ]))
210 }
211
212 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
214 let schema = Self::schema();
215 let file_paths = entries.iter().map(|e| e.file_path.as_str());
216 let file_sizes = entries.iter().map(|e| e.file_size);
217 let last_modified_ms = entries.iter().map(|e| {
218 e.last_modified_ms
219 .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
220 });
221 let node_ids = entries.iter().map(|e| e.node_id);
222
223 let columns: Vec<ArrayRef> = vec![
224 Arc::new(StringArray::from_iter_values(file_paths)),
225 Arc::new(UInt64Array::from_iter(file_sizes)),
226 Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
227 Arc::new(UInt64Array::from_iter(node_ids)),
228 ];
229
230 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
231 }
232
233 pub fn reserved_table_name_for_inspection() -> &'static str {
239 "__inspect/__mito/__sst_storage"
240 }
241
242 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
244 build_plan_helper(
245 scan_request,
246 Self::reserved_table_name_for_inspection(),
247 Self::schema(),
248 )
249 }
250}
251
252fn build_plan_helper(
253 scan_request: ScanRequest,
254 table_name: &str,
255 schema: SchemaRef,
256) -> Result<LogicalPlan, DataFusionError> {
257 let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
258
259 let mut builder = LogicalPlanBuilder::scan(
260 table_name,
261 Arc::new(table_source),
262 scan_request.projection.clone(),
263 )?;
264
265 for filter in scan_request.filters {
266 builder = builder.filter(filter)?;
267 }
268
269 if let Some(limit) = scan_request.limit {
270 builder = builder.limit(0, Some(limit))?;
271 }
272
273 builder.build()
274}
275
276#[cfg(test)]
277mod tests {
278 use datafusion_common::TableReference;
279 use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
280 use datatypes::arrow::array::{
281 Array, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
282 UInt64Array,
283 };
284 use datatypes::arrow_array::StringArray;
285
286 use super::*;
287
288 #[test]
289 fn test_sst_entry_manifest_to_record_batch() {
290 let table_id1: TableId = 1;
292 let region_group1: RegionGroup = 2;
293 let region_seq1: RegionSeq = 3;
294 let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
295 let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
296
297 let table_id2: TableId = 5;
298 let region_group2: RegionGroup = 1;
299 let region_seq2: RegionSeq = 42;
300 let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
301 let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
302
303 let entries = vec![
304 ManifestSstEntry {
305 table_dir: "tdir1".to_string(),
306 region_id: region_id1,
307 table_id: table_id1,
308 region_number: region_number1,
309 region_group: region_group1,
310 region_sequence: region_seq1,
311 file_id: "f1".to_string(),
312 level: 1,
313 file_path: "/p1".to_string(),
314 file_size: 100,
315 index_file_path: None,
316 index_file_size: None,
317 num_rows: 10,
318 num_row_groups: 2,
319 min_ts: Timestamp::new_millisecond(1000), max_ts: Timestamp::new_second(2), sequence: None,
322 origin_region_id: region_id1,
323 node_id: Some(1),
324 visible: false,
325 },
326 ManifestSstEntry {
327 table_dir: "tdir2".to_string(),
328 region_id: region_id2,
329 table_id: table_id2,
330 region_number: region_number2,
331 region_group: region_group2,
332 region_sequence: region_seq2,
333 file_id: "f2".to_string(),
334 level: 3,
335 file_path: "/p2".to_string(),
336 file_size: 200,
337 index_file_path: Some("idx".to_string()),
338 index_file_size: Some(11),
339 num_rows: 20,
340 num_row_groups: 4,
341 min_ts: Timestamp::new_nanosecond(5), max_ts: Timestamp::new_microsecond(2000), sequence: Some(9),
344 origin_region_id: region_id2,
345 node_id: None,
346 visible: true,
347 },
348 ];
349
350 let schema = ManifestSstEntry::schema();
351 let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
352
353 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
355 assert_eq!(2, batch.num_rows());
356 for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
357 assert_eq!(f.name(), batch.schema().field(i).name());
358 assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
359 assert_eq!(f.data_type(), batch.schema().field(i).data_type());
360 }
361
362 let table_dirs = batch
364 .column(0)
365 .as_any()
366 .downcast_ref::<StringArray>()
367 .unwrap();
368 assert_eq!("tdir1", table_dirs.value(0));
369 assert_eq!("tdir2", table_dirs.value(1));
370
371 let region_ids = batch
372 .column(1)
373 .as_any()
374 .downcast_ref::<UInt64Array>()
375 .unwrap();
376 assert_eq!(region_id1.as_u64(), region_ids.value(0));
377 assert_eq!(region_id2.as_u64(), region_ids.value(1));
378
379 let table_ids = batch
380 .column(2)
381 .as_any()
382 .downcast_ref::<UInt32Array>()
383 .unwrap();
384 assert_eq!(table_id1, table_ids.value(0));
385 assert_eq!(table_id2, table_ids.value(1));
386
387 let region_numbers = batch
388 .column(3)
389 .as_any()
390 .downcast_ref::<UInt32Array>()
391 .unwrap();
392 assert_eq!(region_number1, region_numbers.value(0));
393 assert_eq!(region_number2, region_numbers.value(1));
394
395 let region_groups = batch
396 .column(4)
397 .as_any()
398 .downcast_ref::<UInt8Array>()
399 .unwrap();
400 assert_eq!(region_group1, region_groups.value(0));
401 assert_eq!(region_group2, region_groups.value(1));
402
403 let region_sequences = batch
404 .column(5)
405 .as_any()
406 .downcast_ref::<UInt32Array>()
407 .unwrap();
408 assert_eq!(region_seq1, region_sequences.value(0));
409 assert_eq!(region_seq2, region_sequences.value(1));
410
411 let file_ids = batch
412 .column(6)
413 .as_any()
414 .downcast_ref::<StringArray>()
415 .unwrap();
416 assert_eq!("f1", file_ids.value(0));
417 assert_eq!("f2", file_ids.value(1));
418
419 let levels = batch
420 .column(7)
421 .as_any()
422 .downcast_ref::<UInt8Array>()
423 .unwrap();
424 assert_eq!(1, levels.value(0));
425 assert_eq!(3, levels.value(1));
426
427 let file_paths = batch
428 .column(8)
429 .as_any()
430 .downcast_ref::<StringArray>()
431 .unwrap();
432 assert_eq!("/p1", file_paths.value(0));
433 assert_eq!("/p2", file_paths.value(1));
434
435 let file_sizes = batch
436 .column(9)
437 .as_any()
438 .downcast_ref::<UInt64Array>()
439 .unwrap();
440 assert_eq!(100, file_sizes.value(0));
441 assert_eq!(200, file_sizes.value(1));
442
443 let index_file_paths = batch
444 .column(10)
445 .as_any()
446 .downcast_ref::<StringArray>()
447 .unwrap();
448 assert!(index_file_paths.is_null(0));
449 assert_eq!("idx", index_file_paths.value(1));
450
451 let index_file_sizes = batch
452 .column(11)
453 .as_any()
454 .downcast_ref::<UInt64Array>()
455 .unwrap();
456 assert!(index_file_sizes.is_null(0));
457 assert_eq!(11, index_file_sizes.value(1));
458
459 let num_rows = batch
460 .column(12)
461 .as_any()
462 .downcast_ref::<UInt64Array>()
463 .unwrap();
464 assert_eq!(10, num_rows.value(0));
465 assert_eq!(20, num_rows.value(1));
466
467 let num_row_groups = batch
468 .column(13)
469 .as_any()
470 .downcast_ref::<UInt64Array>()
471 .unwrap();
472 assert_eq!(2, num_row_groups.value(0));
473 assert_eq!(4, num_row_groups.value(1));
474
475 let min_ts = batch
476 .column(14)
477 .as_any()
478 .downcast_ref::<TimestampNanosecondArray>()
479 .unwrap();
480 assert_eq!(1_000_000_000, min_ts.value(0));
481 assert_eq!(5, min_ts.value(1));
482
483 let max_ts = batch
484 .column(15)
485 .as_any()
486 .downcast_ref::<TimestampNanosecondArray>()
487 .unwrap();
488 assert_eq!(2_000_000_000, max_ts.value(0));
489 assert_eq!(2_000_000, max_ts.value(1));
490
491 let sequences = batch
492 .column(16)
493 .as_any()
494 .downcast_ref::<UInt64Array>()
495 .unwrap();
496 assert!(sequences.is_null(0));
497 assert_eq!(9, sequences.value(1));
498
499 let origin_region_ids = batch
500 .column(17)
501 .as_any()
502 .downcast_ref::<UInt64Array>()
503 .unwrap();
504 assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
505 assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
506
507 let node_ids = batch
508 .column(18)
509 .as_any()
510 .downcast_ref::<UInt64Array>()
511 .unwrap();
512 assert_eq!(1, node_ids.value(0));
513 assert!(node_ids.is_null(1));
514
515 let visible = batch
516 .column(19)
517 .as_any()
518 .downcast_ref::<BooleanArray>()
519 .unwrap();
520 assert!(!visible.value(0));
521 assert!(visible.value(1));
522 }
523
524 #[test]
525 fn test_sst_entry_storage_to_record_batch() {
526 let entries = vec![
527 StorageSstEntry {
528 file_path: "/s1".to_string(),
529 file_size: None,
530 last_modified_ms: None,
531 node_id: Some(1),
532 },
533 StorageSstEntry {
534 file_path: "/s2".to_string(),
535 file_size: Some(123),
536 last_modified_ms: Some(Timestamp::new_millisecond(456)),
537 node_id: None,
538 },
539 ];
540
541 let schema = StorageSstEntry::schema();
542 let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
543
544 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
545 assert_eq!(2, batch.num_rows());
546
547 let file_paths = batch
548 .column(0)
549 .as_any()
550 .downcast_ref::<StringArray>()
551 .unwrap();
552 assert_eq!("/s1", file_paths.value(0));
553 assert_eq!("/s2", file_paths.value(1));
554
555 let file_sizes = batch
556 .column(1)
557 .as_any()
558 .downcast_ref::<UInt64Array>()
559 .unwrap();
560 assert!(file_sizes.is_null(0));
561 assert_eq!(123, file_sizes.value(1));
562
563 let last_modified = batch
564 .column(2)
565 .as_any()
566 .downcast_ref::<TimestampMillisecondArray>()
567 .unwrap();
568 assert!(last_modified.is_null(0));
569 assert_eq!(456, last_modified.value(1));
570
571 let node_ids = batch
572 .column(3)
573 .as_any()
574 .downcast_ref::<UInt64Array>()
575 .unwrap();
576 assert_eq!(1, node_ids.value(0));
577 assert!(node_ids.is_null(1));
578 }
579
580 #[test]
581 fn test_manifest_build_plan() {
582 let request = ScanRequest {
584 projection: Some(vec![0, 1, 2]),
585 filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
586 limit: Some(5),
587 ..Default::default()
588 };
589
590 let plan = ManifestSstEntry::build_plan(request).unwrap();
591
592 let (scan, has_filter, has_limit) = extract_scan(&plan);
595
596 assert!(has_filter);
597 assert!(has_limit);
598 assert_eq!(
599 scan.table_name,
600 TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
601 );
602 assert_eq!(scan.projection, Some(vec![0, 1, 2]));
603
604 let fields = scan.projected_schema.fields();
606 assert_eq!(fields.len(), 3);
607 assert_eq!(fields[0].name(), "table_dir");
608 assert_eq!(fields[1].name(), "region_id");
609 assert_eq!(fields[2].name(), "table_id");
610 }
611
612 #[test]
613 fn test_storage_build_plan() {
614 let request = ScanRequest {
615 projection: Some(vec![0, 2]),
616 filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
617 limit: Some(1),
618 ..Default::default()
619 };
620
621 let plan = StorageSstEntry::build_plan(request).unwrap();
622 let (scan, has_filter, has_limit) = extract_scan(&plan);
623 assert!(has_filter);
624 assert!(has_limit);
625 assert_eq!(
626 scan.table_name,
627 TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
628 );
629 assert_eq!(scan.projection, Some(vec![0, 2]));
630
631 let fields = scan.projected_schema.fields();
632 assert_eq!(fields.len(), 2);
633 assert_eq!(fields[0].name(), "file_path");
634 assert_eq!(fields[1].name(), "last_modified_ms");
635 }
636
637 fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
639 use datafusion_expr::logical_plan::Limit;
640
641 match plan {
642 LogicalPlan::Filter(f) => {
643 let (scan, _, has_limit) = extract_scan(&f.input);
644 (scan, true, has_limit)
645 }
646 LogicalPlan::Limit(Limit { input, .. }) => {
647 let (scan, has_filter, _) = extract_scan(input);
648 (scan, has_filter, true)
649 }
650 LogicalPlan::TableScan(scan) => (scan, false, false),
651 other => panic!("unexpected plan: {other:?}"),
652 }
653 }
654}