1use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use common_time::Timestamp;
19use common_time::timestamp::TimeUnit;
20use datafusion_common::DataFusionError;
21use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
22use datatypes::arrow::array::{
23 ArrayRef, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
24 UInt64Array,
25};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow_array::StringArray;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use serde::{Deserialize, Serialize};
30
31use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct ManifestSstEntry {
36 pub table_dir: String,
38 pub region_id: RegionId,
40 pub table_id: TableId,
42 pub region_number: RegionNumber,
44 pub region_group: RegionGroup,
46 pub region_sequence: RegionSeq,
48 pub file_id: String,
50 pub level: u8,
52 pub file_path: String,
54 pub file_size: u64,
56 pub index_file_path: Option<String>,
58 pub index_file_size: Option<u64>,
60 pub num_rows: u64,
62 pub num_row_groups: u64,
64 pub min_ts: Timestamp,
66 pub max_ts: Timestamp,
68 pub sequence: Option<u64>,
70 pub origin_region_id: RegionId,
72 pub node_id: Option<u64>,
74}
75
76impl ManifestSstEntry {
77 pub fn schema() -> SchemaRef {
79 use datatypes::prelude::ConcreteDataType as Ty;
80 Arc::new(Schema::new(vec![
81 ColumnSchema::new("table_dir", Ty::string_datatype(), false),
82 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
83 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
84 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
85 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
86 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
87 ColumnSchema::new("file_id", Ty::string_datatype(), false),
88 ColumnSchema::new("level", Ty::uint8_datatype(), false),
89 ColumnSchema::new("file_path", Ty::string_datatype(), false),
90 ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
91 ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
92 ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
93 ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
94 ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
95 ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
96 ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
97 ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
98 ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
99 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
100 ]))
101 }
102
103 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
105 let schema = Self::schema();
106 let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
107 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
108 let table_ids = entries.iter().map(|e| e.table_id);
109 let region_numbers = entries.iter().map(|e| e.region_number);
110 let region_groups = entries.iter().map(|e| e.region_group);
111 let region_sequences = entries.iter().map(|e| e.region_sequence);
112 let file_ids = entries.iter().map(|e| e.file_id.as_str());
113 let levels = entries.iter().map(|e| e.level);
114 let file_paths = entries.iter().map(|e| e.file_path.as_str());
115 let file_sizes = entries.iter().map(|e| e.file_size);
116 let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
117 let index_file_sizes = entries.iter().map(|e| e.index_file_size);
118 let num_rows = entries.iter().map(|e| e.num_rows);
119 let num_row_groups = entries.iter().map(|e| e.num_row_groups);
120 let min_ts = entries.iter().map(|e| {
121 e.min_ts
122 .convert_to(TimeUnit::Nanosecond)
123 .map(|ts| ts.value())
124 });
125 let max_ts = entries.iter().map(|e| {
126 e.max_ts
127 .convert_to(TimeUnit::Nanosecond)
128 .map(|ts| ts.value())
129 });
130 let sequences = entries.iter().map(|e| e.sequence);
131 let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
132 let node_ids = entries.iter().map(|e| e.node_id);
133
134 let columns: Vec<ArrayRef> = vec![
135 Arc::new(StringArray::from_iter_values(table_dirs)),
136 Arc::new(UInt64Array::from_iter_values(region_ids)),
137 Arc::new(UInt32Array::from_iter_values(table_ids)),
138 Arc::new(UInt32Array::from_iter_values(region_numbers)),
139 Arc::new(UInt8Array::from_iter_values(region_groups)),
140 Arc::new(UInt32Array::from_iter_values(region_sequences)),
141 Arc::new(StringArray::from_iter_values(file_ids)),
142 Arc::new(UInt8Array::from_iter_values(levels)),
143 Arc::new(StringArray::from_iter_values(file_paths)),
144 Arc::new(UInt64Array::from_iter_values(file_sizes)),
145 Arc::new(StringArray::from_iter(index_file_paths)),
146 Arc::new(UInt64Array::from_iter(index_file_sizes)),
147 Arc::new(UInt64Array::from_iter_values(num_rows)),
148 Arc::new(UInt64Array::from_iter_values(num_row_groups)),
149 Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
150 Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
151 Arc::new(UInt64Array::from_iter(sequences)),
152 Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
153 Arc::new(UInt64Array::from_iter(node_ids)),
154 ];
155
156 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
157 }
158
159 pub fn reserved_table_name_for_inspection() -> &'static str {
165 "__inspect/__mito/__sst_manifest"
166 }
167
168 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
170 build_plan_helper(
171 scan_request,
172 Self::reserved_table_name_for_inspection(),
173 Self::schema(),
174 )
175 }
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
180pub struct StorageSstEntry {
181 pub file_path: String,
183 pub file_size: Option<u64>,
185 pub last_modified_ms: Option<Timestamp>,
187 pub node_id: Option<u64>,
189}
190
191impl StorageSstEntry {
192 pub fn schema() -> SchemaRef {
194 use datatypes::prelude::ConcreteDataType as Ty;
195 Arc::new(Schema::new(vec![
196 ColumnSchema::new("file_path", Ty::string_datatype(), false),
197 ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
198 ColumnSchema::new(
199 "last_modified_ms",
200 Ty::timestamp_millisecond_datatype(),
201 true,
202 ),
203 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
204 ]))
205 }
206
207 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
209 let schema = Self::schema();
210 let file_paths = entries.iter().map(|e| e.file_path.as_str());
211 let file_sizes = entries.iter().map(|e| e.file_size);
212 let last_modified_ms = entries.iter().map(|e| {
213 e.last_modified_ms
214 .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
215 });
216 let node_ids = entries.iter().map(|e| e.node_id);
217
218 let columns: Vec<ArrayRef> = vec![
219 Arc::new(StringArray::from_iter_values(file_paths)),
220 Arc::new(UInt64Array::from_iter(file_sizes)),
221 Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
222 Arc::new(UInt64Array::from_iter(node_ids)),
223 ];
224
225 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
226 }
227
228 pub fn reserved_table_name_for_inspection() -> &'static str {
234 "__inspect/__mito/__sst_storage"
235 }
236
237 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
239 build_plan_helper(
240 scan_request,
241 Self::reserved_table_name_for_inspection(),
242 Self::schema(),
243 )
244 }
245}
246
247fn build_plan_helper(
248 scan_request: ScanRequest,
249 table_name: &str,
250 schema: SchemaRef,
251) -> Result<LogicalPlan, DataFusionError> {
252 let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
253
254 let mut builder = LogicalPlanBuilder::scan(
255 table_name,
256 Arc::new(table_source),
257 scan_request.projection.clone(),
258 )?;
259
260 for filter in scan_request.filters {
261 builder = builder.filter(filter)?;
262 }
263
264 if let Some(limit) = scan_request.limit {
265 builder = builder.limit(0, Some(limit))?;
266 }
267
268 builder.build()
269}
270
271#[cfg(test)]
272mod tests {
273 use datafusion_common::TableReference;
274 use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
275 use datatypes::arrow::array::{
276 Array, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
277 UInt64Array,
278 };
279 use datatypes::arrow_array::StringArray;
280
281 use super::*;
282
283 #[test]
284 fn test_sst_entry_manifest_to_record_batch() {
285 let table_id1: TableId = 1;
287 let region_group1: RegionGroup = 2;
288 let region_seq1: RegionSeq = 3;
289 let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
290 let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
291
292 let table_id2: TableId = 5;
293 let region_group2: RegionGroup = 1;
294 let region_seq2: RegionSeq = 42;
295 let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
296 let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
297
298 let entries = vec![
299 ManifestSstEntry {
300 table_dir: "tdir1".to_string(),
301 region_id: region_id1,
302 table_id: table_id1,
303 region_number: region_number1,
304 region_group: region_group1,
305 region_sequence: region_seq1,
306 file_id: "f1".to_string(),
307 level: 1,
308 file_path: "/p1".to_string(),
309 file_size: 100,
310 index_file_path: None,
311 index_file_size: None,
312 num_rows: 10,
313 num_row_groups: 2,
314 min_ts: Timestamp::new_millisecond(1000), max_ts: Timestamp::new_second(2), sequence: None,
317 origin_region_id: region_id1,
318 node_id: Some(1),
319 },
320 ManifestSstEntry {
321 table_dir: "tdir2".to_string(),
322 region_id: region_id2,
323 table_id: table_id2,
324 region_number: region_number2,
325 region_group: region_group2,
326 region_sequence: region_seq2,
327 file_id: "f2".to_string(),
328 level: 3,
329 file_path: "/p2".to_string(),
330 file_size: 200,
331 index_file_path: Some("idx".to_string()),
332 index_file_size: Some(11),
333 num_rows: 20,
334 num_row_groups: 4,
335 min_ts: Timestamp::new_nanosecond(5), max_ts: Timestamp::new_microsecond(2000), sequence: Some(9),
338 origin_region_id: region_id2,
339 node_id: None,
340 },
341 ];
342
343 let schema = ManifestSstEntry::schema();
344 let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
345
346 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
348 assert_eq!(2, batch.num_rows());
349 for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
350 assert_eq!(f.name(), batch.schema().field(i).name());
351 assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
352 assert_eq!(f.data_type(), batch.schema().field(i).data_type());
353 }
354
355 let table_dirs = batch
357 .column(0)
358 .as_any()
359 .downcast_ref::<StringArray>()
360 .unwrap();
361 assert_eq!("tdir1", table_dirs.value(0));
362 assert_eq!("tdir2", table_dirs.value(1));
363
364 let region_ids = batch
365 .column(1)
366 .as_any()
367 .downcast_ref::<UInt64Array>()
368 .unwrap();
369 assert_eq!(region_id1.as_u64(), region_ids.value(0));
370 assert_eq!(region_id2.as_u64(), region_ids.value(1));
371
372 let table_ids = batch
373 .column(2)
374 .as_any()
375 .downcast_ref::<UInt32Array>()
376 .unwrap();
377 assert_eq!(table_id1, table_ids.value(0));
378 assert_eq!(table_id2, table_ids.value(1));
379
380 let region_numbers = batch
381 .column(3)
382 .as_any()
383 .downcast_ref::<UInt32Array>()
384 .unwrap();
385 assert_eq!(region_number1, region_numbers.value(0));
386 assert_eq!(region_number2, region_numbers.value(1));
387
388 let region_groups = batch
389 .column(4)
390 .as_any()
391 .downcast_ref::<UInt8Array>()
392 .unwrap();
393 assert_eq!(region_group1, region_groups.value(0));
394 assert_eq!(region_group2, region_groups.value(1));
395
396 let region_sequences = batch
397 .column(5)
398 .as_any()
399 .downcast_ref::<UInt32Array>()
400 .unwrap();
401 assert_eq!(region_seq1, region_sequences.value(0));
402 assert_eq!(region_seq2, region_sequences.value(1));
403
404 let file_ids = batch
405 .column(6)
406 .as_any()
407 .downcast_ref::<StringArray>()
408 .unwrap();
409 assert_eq!("f1", file_ids.value(0));
410 assert_eq!("f2", file_ids.value(1));
411
412 let levels = batch
413 .column(7)
414 .as_any()
415 .downcast_ref::<UInt8Array>()
416 .unwrap();
417 assert_eq!(1, levels.value(0));
418 assert_eq!(3, levels.value(1));
419
420 let file_paths = batch
421 .column(8)
422 .as_any()
423 .downcast_ref::<StringArray>()
424 .unwrap();
425 assert_eq!("/p1", file_paths.value(0));
426 assert_eq!("/p2", file_paths.value(1));
427
428 let file_sizes = batch
429 .column(9)
430 .as_any()
431 .downcast_ref::<UInt64Array>()
432 .unwrap();
433 assert_eq!(100, file_sizes.value(0));
434 assert_eq!(200, file_sizes.value(1));
435
436 let index_file_paths = batch
437 .column(10)
438 .as_any()
439 .downcast_ref::<StringArray>()
440 .unwrap();
441 assert!(index_file_paths.is_null(0));
442 assert_eq!("idx", index_file_paths.value(1));
443
444 let index_file_sizes = batch
445 .column(11)
446 .as_any()
447 .downcast_ref::<UInt64Array>()
448 .unwrap();
449 assert!(index_file_sizes.is_null(0));
450 assert_eq!(11, index_file_sizes.value(1));
451
452 let num_rows = batch
453 .column(12)
454 .as_any()
455 .downcast_ref::<UInt64Array>()
456 .unwrap();
457 assert_eq!(10, num_rows.value(0));
458 assert_eq!(20, num_rows.value(1));
459
460 let num_row_groups = batch
461 .column(13)
462 .as_any()
463 .downcast_ref::<UInt64Array>()
464 .unwrap();
465 assert_eq!(2, num_row_groups.value(0));
466 assert_eq!(4, num_row_groups.value(1));
467
468 let min_ts = batch
469 .column(14)
470 .as_any()
471 .downcast_ref::<TimestampNanosecondArray>()
472 .unwrap();
473 assert_eq!(1_000_000_000, min_ts.value(0));
474 assert_eq!(5, min_ts.value(1));
475
476 let max_ts = batch
477 .column(15)
478 .as_any()
479 .downcast_ref::<TimestampNanosecondArray>()
480 .unwrap();
481 assert_eq!(2_000_000_000, max_ts.value(0));
482 assert_eq!(2_000_000, max_ts.value(1));
483
484 let sequences = batch
485 .column(16)
486 .as_any()
487 .downcast_ref::<UInt64Array>()
488 .unwrap();
489 assert!(sequences.is_null(0));
490 assert_eq!(9, sequences.value(1));
491
492 let origin_region_ids = batch
493 .column(17)
494 .as_any()
495 .downcast_ref::<UInt64Array>()
496 .unwrap();
497 assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
498 assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
499
500 let node_ids = batch
501 .column(18)
502 .as_any()
503 .downcast_ref::<UInt64Array>()
504 .unwrap();
505 assert_eq!(1, node_ids.value(0));
506 assert!(node_ids.is_null(1));
507 }
508
509 #[test]
510 fn test_sst_entry_storage_to_record_batch() {
511 let entries = vec![
512 StorageSstEntry {
513 file_path: "/s1".to_string(),
514 file_size: None,
515 last_modified_ms: None,
516 node_id: Some(1),
517 },
518 StorageSstEntry {
519 file_path: "/s2".to_string(),
520 file_size: Some(123),
521 last_modified_ms: Some(Timestamp::new_millisecond(456)),
522 node_id: None,
523 },
524 ];
525
526 let schema = StorageSstEntry::schema();
527 let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
528
529 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
530 assert_eq!(2, batch.num_rows());
531
532 let file_paths = batch
533 .column(0)
534 .as_any()
535 .downcast_ref::<StringArray>()
536 .unwrap();
537 assert_eq!("/s1", file_paths.value(0));
538 assert_eq!("/s2", file_paths.value(1));
539
540 let file_sizes = batch
541 .column(1)
542 .as_any()
543 .downcast_ref::<UInt64Array>()
544 .unwrap();
545 assert!(file_sizes.is_null(0));
546 assert_eq!(123, file_sizes.value(1));
547
548 let last_modified = batch
549 .column(2)
550 .as_any()
551 .downcast_ref::<TimestampMillisecondArray>()
552 .unwrap();
553 assert!(last_modified.is_null(0));
554 assert_eq!(456, last_modified.value(1));
555
556 let node_ids = batch
557 .column(3)
558 .as_any()
559 .downcast_ref::<UInt64Array>()
560 .unwrap();
561 assert_eq!(1, node_ids.value(0));
562 assert!(node_ids.is_null(1));
563 }
564
565 #[test]
566 fn test_manifest_build_plan() {
567 let request = ScanRequest {
569 projection: Some(vec![0, 1, 2]),
570 filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
571 limit: Some(5),
572 ..Default::default()
573 };
574
575 let plan = ManifestSstEntry::build_plan(request).unwrap();
576
577 let (scan, has_filter, has_limit) = extract_scan(&plan);
580
581 assert!(has_filter);
582 assert!(has_limit);
583 assert_eq!(
584 scan.table_name,
585 TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
586 );
587 assert_eq!(scan.projection, Some(vec![0, 1, 2]));
588
589 let fields = scan.projected_schema.fields();
591 assert_eq!(fields.len(), 3);
592 assert_eq!(fields[0].name(), "table_dir");
593 assert_eq!(fields[1].name(), "region_id");
594 assert_eq!(fields[2].name(), "table_id");
595 }
596
597 #[test]
598 fn test_storage_build_plan() {
599 let request = ScanRequest {
600 projection: Some(vec![0, 2]),
601 filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
602 limit: Some(1),
603 ..Default::default()
604 };
605
606 let plan = StorageSstEntry::build_plan(request).unwrap();
607 let (scan, has_filter, has_limit) = extract_scan(&plan);
608 assert!(has_filter);
609 assert!(has_limit);
610 assert_eq!(
611 scan.table_name,
612 TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
613 );
614 assert_eq!(scan.projection, Some(vec![0, 2]));
615
616 let fields = scan.projected_schema.fields();
617 assert_eq!(fields.len(), 2);
618 assert_eq!(fields[0].name(), "file_path");
619 assert_eq!(fields[1].name(), "last_modified_ms");
620 }
621
622 fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
624 use datafusion_expr::logical_plan::Limit;
625
626 match plan {
627 LogicalPlan::Filter(f) => {
628 let (scan, _, has_limit) = extract_scan(&f.input);
629 (scan, true, has_limit)
630 }
631 LogicalPlan::Limit(Limit { input, .. }) => {
632 let (scan, has_filter, _) = extract_scan(input);
633 (scan, has_filter, true)
634 }
635 LogicalPlan::TableScan(scan) => (scan, false, false),
636 other => panic!("unexpected plan: {other:?}"),
637 }
638 }
639}