1use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use common_time::Timestamp;
19use common_time::timestamp::TimeUnit;
20use datafusion_common::DataFusionError;
21use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
22use datatypes::arrow::array::{
23 ArrayRef, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
24 UInt32Array, UInt64Array,
25};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow_array::StringArray;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use serde::{Deserialize, Serialize};
30
31use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct ManifestSstEntry {
36 pub table_dir: String,
38 pub region_id: RegionId,
40 pub table_id: TableId,
42 pub region_number: RegionNumber,
44 pub region_group: RegionGroup,
46 pub region_sequence: RegionSeq,
48 pub file_id: String,
50 pub index_file_id: Option<String>,
52 pub level: u8,
54 pub file_path: String,
56 pub file_size: u64,
58 pub index_file_path: Option<String>,
60 pub index_file_size: Option<u64>,
62 pub num_rows: u64,
64 pub num_row_groups: u64,
66 pub num_series: Option<u64>,
68 pub min_ts: Timestamp,
70 pub max_ts: Timestamp,
72 pub sequence: Option<u64>,
74 pub origin_region_id: RegionId,
76 pub node_id: Option<u64>,
78 pub visible: bool,
80}
81
82impl ManifestSstEntry {
83 pub fn schema() -> SchemaRef {
85 use datatypes::prelude::ConcreteDataType as Ty;
86 Arc::new(Schema::new(vec![
87 ColumnSchema::new("table_dir", Ty::string_datatype(), false),
88 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
89 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
90 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
91 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
92 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
93 ColumnSchema::new("file_id", Ty::string_datatype(), false),
94 ColumnSchema::new("index_file_id", Ty::string_datatype(), true),
95 ColumnSchema::new("level", Ty::uint8_datatype(), false),
96 ColumnSchema::new("file_path", Ty::string_datatype(), false),
97 ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
98 ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
99 ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
100 ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
101 ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
102 ColumnSchema::new("num_series", Ty::uint64_datatype(), true),
103 ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
104 ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
105 ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
106 ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
107 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
108 ColumnSchema::new("visible", Ty::boolean_datatype(), false),
109 ]))
110 }
111
112 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
114 let schema = Self::schema();
115 let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
116 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
117 let table_ids = entries.iter().map(|e| e.table_id);
118 let region_numbers = entries.iter().map(|e| e.region_number);
119 let region_groups = entries.iter().map(|e| e.region_group);
120 let region_sequences = entries.iter().map(|e| e.region_sequence);
121 let file_ids = entries.iter().map(|e| e.file_id.as_str());
122 let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref());
123 let levels = entries.iter().map(|e| e.level);
124 let file_paths = entries.iter().map(|e| e.file_path.as_str());
125 let file_sizes = entries.iter().map(|e| e.file_size);
126 let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
127 let index_file_sizes = entries.iter().map(|e| e.index_file_size);
128 let num_rows = entries.iter().map(|e| e.num_rows);
129 let num_row_groups = entries.iter().map(|e| e.num_row_groups);
130 let num_series = entries.iter().map(|e| e.num_series);
131 let min_ts = entries.iter().map(|e| {
132 e.min_ts
133 .convert_to(TimeUnit::Nanosecond)
134 .map(|ts| ts.value())
135 });
136 let max_ts = entries.iter().map(|e| {
137 e.max_ts
138 .convert_to(TimeUnit::Nanosecond)
139 .map(|ts| ts.value())
140 });
141 let sequences = entries.iter().map(|e| e.sequence);
142 let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
143 let node_ids = entries.iter().map(|e| e.node_id);
144 let visible_flags = entries.iter().map(|e| Some(e.visible));
145
146 let columns: Vec<ArrayRef> = vec![
147 Arc::new(StringArray::from_iter_values(table_dirs)),
148 Arc::new(UInt64Array::from_iter_values(region_ids)),
149 Arc::new(UInt32Array::from_iter_values(table_ids)),
150 Arc::new(UInt32Array::from_iter_values(region_numbers)),
151 Arc::new(UInt8Array::from_iter_values(region_groups)),
152 Arc::new(UInt32Array::from_iter_values(region_sequences)),
153 Arc::new(StringArray::from_iter_values(file_ids)),
154 Arc::new(StringArray::from_iter(index_file_ids)),
155 Arc::new(UInt8Array::from_iter_values(levels)),
156 Arc::new(StringArray::from_iter_values(file_paths)),
157 Arc::new(UInt64Array::from_iter_values(file_sizes)),
158 Arc::new(StringArray::from_iter(index_file_paths)),
159 Arc::new(UInt64Array::from_iter(index_file_sizes)),
160 Arc::new(UInt64Array::from_iter_values(num_rows)),
161 Arc::new(UInt64Array::from_iter_values(num_row_groups)),
162 Arc::new(UInt64Array::from_iter(num_series)),
163 Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
164 Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
165 Arc::new(UInt64Array::from_iter(sequences)),
166 Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
167 Arc::new(UInt64Array::from_iter(node_ids)),
168 Arc::new(BooleanArray::from_iter(visible_flags)),
169 ];
170
171 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
172 }
173
174 pub fn reserved_table_name_for_inspection() -> &'static str {
180 "__inspect/__mito/__sst_manifest"
181 }
182
183 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
185 build_plan_helper(
186 scan_request,
187 Self::reserved_table_name_for_inspection(),
188 Self::schema(),
189 )
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct StorageSstEntry {
196 pub file_path: String,
198 pub file_size: Option<u64>,
200 pub last_modified_ms: Option<Timestamp>,
202 pub node_id: Option<u64>,
204}
205
206impl StorageSstEntry {
207 pub fn schema() -> SchemaRef {
209 use datatypes::prelude::ConcreteDataType as Ty;
210 Arc::new(Schema::new(vec![
211 ColumnSchema::new("file_path", Ty::string_datatype(), false),
212 ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
213 ColumnSchema::new(
214 "last_modified_ms",
215 Ty::timestamp_millisecond_datatype(),
216 true,
217 ),
218 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
219 ]))
220 }
221
222 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
224 let schema = Self::schema();
225 let file_paths = entries.iter().map(|e| e.file_path.as_str());
226 let file_sizes = entries.iter().map(|e| e.file_size);
227 let last_modified_ms = entries.iter().map(|e| {
228 e.last_modified_ms
229 .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
230 });
231 let node_ids = entries.iter().map(|e| e.node_id);
232
233 let columns: Vec<ArrayRef> = vec![
234 Arc::new(StringArray::from_iter_values(file_paths)),
235 Arc::new(UInt64Array::from_iter(file_sizes)),
236 Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
237 Arc::new(UInt64Array::from_iter(node_ids)),
238 ];
239
240 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
241 }
242
243 pub fn reserved_table_name_for_inspection() -> &'static str {
249 "__inspect/__mito/__sst_storage"
250 }
251
252 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
254 build_plan_helper(
255 scan_request,
256 Self::reserved_table_name_for_inspection(),
257 Self::schema(),
258 )
259 }
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
264pub struct PuffinIndexMetaEntry {
265 pub table_dir: String,
267 pub index_file_path: String,
269 pub region_id: RegionId,
271 pub table_id: TableId,
273 pub region_number: RegionNumber,
275 pub region_group: RegionGroup,
277 pub region_sequence: RegionSeq,
279 pub file_id: String,
281 pub index_file_size: Option<u64>,
283 pub index_type: String,
285 pub target_type: String,
287 pub target_key: String,
289 pub target_json: String,
291 pub blob_size: u64,
293 pub meta_json: Option<String>,
295 pub node_id: Option<u64>,
297}
298
299impl PuffinIndexMetaEntry {
300 pub fn schema() -> SchemaRef {
302 use datatypes::prelude::ConcreteDataType as Ty;
303 Arc::new(Schema::new(vec![
304 ColumnSchema::new("table_dir", Ty::string_datatype(), false),
305 ColumnSchema::new("index_file_path", Ty::string_datatype(), false),
306 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
307 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
308 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
309 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
310 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
311 ColumnSchema::new("file_id", Ty::string_datatype(), false),
312 ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
313 ColumnSchema::new("index_type", Ty::string_datatype(), false),
314 ColumnSchema::new("target_type", Ty::string_datatype(), false),
315 ColumnSchema::new("target_key", Ty::string_datatype(), false),
316 ColumnSchema::new("target_json", Ty::string_datatype(), false),
317 ColumnSchema::new("blob_size", Ty::uint64_datatype(), false),
318 ColumnSchema::new("meta_json", Ty::string_datatype(), true),
319 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
320 ]))
321 }
322
323 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
325 let schema = Self::schema();
326 let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
327 let index_file_paths = entries.iter().map(|e| e.index_file_path.as_str());
328 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
329 let table_ids = entries.iter().map(|e| e.table_id);
330 let region_numbers = entries.iter().map(|e| e.region_number);
331 let region_groups = entries.iter().map(|e| e.region_group);
332 let region_sequences = entries.iter().map(|e| e.region_sequence);
333 let file_ids = entries.iter().map(|e| e.file_id.as_str());
334 let index_file_sizes = entries.iter().map(|e| e.index_file_size);
335 let index_types = entries.iter().map(|e| e.index_type.as_str());
336 let target_types = entries.iter().map(|e| e.target_type.as_str());
337 let target_keys = entries.iter().map(|e| e.target_key.as_str());
338 let target_jsons = entries.iter().map(|e| e.target_json.as_str());
339 let blob_sizes = entries.iter().map(|e| e.blob_size);
340 let meta_jsons = entries.iter().map(|e| e.meta_json.as_deref());
341 let node_ids = entries.iter().map(|e| e.node_id);
342
343 let columns: Vec<ArrayRef> = vec![
344 Arc::new(StringArray::from_iter_values(table_dirs)),
345 Arc::new(StringArray::from_iter_values(index_file_paths)),
346 Arc::new(UInt64Array::from_iter_values(region_ids)),
347 Arc::new(UInt32Array::from_iter_values(table_ids)),
348 Arc::new(UInt32Array::from_iter_values(region_numbers)),
349 Arc::new(UInt8Array::from_iter_values(region_groups)),
350 Arc::new(UInt32Array::from_iter_values(region_sequences)),
351 Arc::new(StringArray::from_iter_values(file_ids)),
352 Arc::new(UInt64Array::from_iter(index_file_sizes)),
353 Arc::new(StringArray::from_iter_values(index_types)),
354 Arc::new(StringArray::from_iter_values(target_types)),
355 Arc::new(StringArray::from_iter_values(target_keys)),
356 Arc::new(StringArray::from_iter_values(target_jsons)),
357 Arc::new(UInt64Array::from_iter_values(blob_sizes)),
358 Arc::new(StringArray::from_iter(meta_jsons)),
359 Arc::new(UInt64Array::from_iter(node_ids)),
360 ];
361
362 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
363 }
364
365 pub fn reserved_table_name_for_inspection() -> &'static str {
367 "__inspect/__mito/__puffin_index_meta"
368 }
369
370 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
372 build_plan_helper(
373 scan_request,
374 Self::reserved_table_name_for_inspection(),
375 Self::schema(),
376 )
377 }
378}
379
380fn build_plan_helper(
381 scan_request: ScanRequest,
382 table_name: &str,
383 schema: SchemaRef,
384) -> Result<LogicalPlan, DataFusionError> {
385 let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
386
387 let mut builder = LogicalPlanBuilder::scan(
388 table_name,
389 Arc::new(table_source),
390 scan_request.projection.clone(),
391 )?;
392
393 for filter in scan_request.filters {
394 builder = builder.filter(filter)?;
395 }
396
397 if let Some(limit) = scan_request.limit {
398 builder = builder.limit(0, Some(limit))?;
399 }
400
401 builder.build()
402}
403
404#[cfg(test)]
405mod tests {
406 use datafusion_common::TableReference;
407 use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
408 use datatypes::arrow::array::{
409 Array, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
410 UInt64Array,
411 };
412 use datatypes::arrow_array::StringArray;
413
414 use super::*;
415
416 #[test]
417 fn test_sst_entry_manifest_to_record_batch() {
418 let table_id1: TableId = 1;
420 let region_group1: RegionGroup = 2;
421 let region_seq1: RegionSeq = 3;
422 let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
423 let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
424
425 let table_id2: TableId = 5;
426 let region_group2: RegionGroup = 1;
427 let region_seq2: RegionSeq = 42;
428 let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
429 let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
430
431 let entries = vec![
432 ManifestSstEntry {
433 table_dir: "tdir1".to_string(),
434 region_id: region_id1,
435 table_id: table_id1,
436 region_number: region_number1,
437 region_group: region_group1,
438 region_sequence: region_seq1,
439 file_id: "f1".to_string(),
440 index_file_id: None,
441 level: 1,
442 file_path: "/p1".to_string(),
443 file_size: 100,
444 index_file_path: None,
445 index_file_size: None,
446 num_rows: 10,
447 num_row_groups: 2,
448 num_series: Some(5),
449 min_ts: Timestamp::new_millisecond(1000), max_ts: Timestamp::new_second(2), sequence: None,
452 origin_region_id: region_id1,
453 node_id: Some(1),
454 visible: false,
455 },
456 ManifestSstEntry {
457 table_dir: "tdir2".to_string(),
458 region_id: region_id2,
459 table_id: table_id2,
460 region_number: region_number2,
461 region_group: region_group2,
462 region_sequence: region_seq2,
463 file_id: "f2".to_string(),
464 index_file_id: Some("idx".to_string()),
465 level: 3,
466 file_path: "/p2".to_string(),
467 file_size: 200,
468 index_file_path: Some("idx".to_string()),
469 index_file_size: Some(11),
470 num_rows: 20,
471 num_row_groups: 4,
472 num_series: None,
473 min_ts: Timestamp::new_nanosecond(5), max_ts: Timestamp::new_microsecond(2000), sequence: Some(9),
476 origin_region_id: region_id2,
477 node_id: None,
478 visible: true,
479 },
480 ];
481
482 let schema = ManifestSstEntry::schema();
483 let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
484
485 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
487 assert_eq!(2, batch.num_rows());
488 for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
489 assert_eq!(f.name(), batch.schema().field(i).name());
490 assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
491 assert_eq!(f.data_type(), batch.schema().field(i).data_type());
492 }
493
494 let table_dirs = batch
496 .column(0)
497 .as_any()
498 .downcast_ref::<StringArray>()
499 .unwrap();
500 assert_eq!("tdir1", table_dirs.value(0));
501 assert_eq!("tdir2", table_dirs.value(1));
502
503 let region_ids = batch
504 .column(1)
505 .as_any()
506 .downcast_ref::<UInt64Array>()
507 .unwrap();
508 assert_eq!(region_id1.as_u64(), region_ids.value(0));
509 assert_eq!(region_id2.as_u64(), region_ids.value(1));
510
511 let table_ids = batch
512 .column(2)
513 .as_any()
514 .downcast_ref::<UInt32Array>()
515 .unwrap();
516 assert_eq!(table_id1, table_ids.value(0));
517 assert_eq!(table_id2, table_ids.value(1));
518
519 let region_numbers = batch
520 .column(3)
521 .as_any()
522 .downcast_ref::<UInt32Array>()
523 .unwrap();
524 assert_eq!(region_number1, region_numbers.value(0));
525 assert_eq!(region_number2, region_numbers.value(1));
526
527 let region_groups = batch
528 .column(4)
529 .as_any()
530 .downcast_ref::<UInt8Array>()
531 .unwrap();
532 assert_eq!(region_group1, region_groups.value(0));
533 assert_eq!(region_group2, region_groups.value(1));
534
535 let region_sequences = batch
536 .column(5)
537 .as_any()
538 .downcast_ref::<UInt32Array>()
539 .unwrap();
540 assert_eq!(region_seq1, region_sequences.value(0));
541 assert_eq!(region_seq2, region_sequences.value(1));
542
543 let file_ids = batch
544 .column(6)
545 .as_any()
546 .downcast_ref::<StringArray>()
547 .unwrap();
548 assert_eq!("f1", file_ids.value(0));
549 assert_eq!("f2", file_ids.value(1));
550
551 let index_file_ids = batch
552 .column(7)
553 .as_any()
554 .downcast_ref::<StringArray>()
555 .unwrap();
556 assert!(index_file_ids.is_null(0));
557 assert_eq!("idx", index_file_ids.value(1));
558
559 let levels = batch
560 .column(8)
561 .as_any()
562 .downcast_ref::<UInt8Array>()
563 .unwrap();
564 assert_eq!(1, levels.value(0));
565 assert_eq!(3, levels.value(1));
566
567 let file_paths = batch
568 .column(9)
569 .as_any()
570 .downcast_ref::<StringArray>()
571 .unwrap();
572 assert_eq!("/p1", file_paths.value(0));
573 assert_eq!("/p2", file_paths.value(1));
574
575 let file_sizes = batch
576 .column(10)
577 .as_any()
578 .downcast_ref::<UInt64Array>()
579 .unwrap();
580 assert_eq!(100, file_sizes.value(0));
581 assert_eq!(200, file_sizes.value(1));
582
583 let index_file_paths = batch
584 .column(11)
585 .as_any()
586 .downcast_ref::<StringArray>()
587 .unwrap();
588 assert!(index_file_paths.is_null(0));
589 assert_eq!("idx", index_file_paths.value(1));
590
591 let index_file_sizes = batch
592 .column(12)
593 .as_any()
594 .downcast_ref::<UInt64Array>()
595 .unwrap();
596 assert!(index_file_sizes.is_null(0));
597 assert_eq!(11, index_file_sizes.value(1));
598
599 let num_rows = batch
600 .column(13)
601 .as_any()
602 .downcast_ref::<UInt64Array>()
603 .unwrap();
604 assert_eq!(10, num_rows.value(0));
605 assert_eq!(20, num_rows.value(1));
606
607 let num_row_groups = batch
608 .column(14)
609 .as_any()
610 .downcast_ref::<UInt64Array>()
611 .unwrap();
612 assert_eq!(2, num_row_groups.value(0));
613 assert_eq!(4, num_row_groups.value(1));
614
615 let num_series = batch
616 .column(15)
617 .as_any()
618 .downcast_ref::<UInt64Array>()
619 .unwrap();
620 assert_eq!(5, num_series.value(0));
621 assert!(num_series.is_null(1));
622
623 let min_ts = batch
624 .column(16)
625 .as_any()
626 .downcast_ref::<TimestampNanosecondArray>()
627 .unwrap();
628 assert_eq!(1_000_000_000, min_ts.value(0));
629 assert_eq!(5, min_ts.value(1));
630
631 let max_ts = batch
632 .column(17)
633 .as_any()
634 .downcast_ref::<TimestampNanosecondArray>()
635 .unwrap();
636 assert_eq!(2_000_000_000, max_ts.value(0));
637 assert_eq!(2_000_000, max_ts.value(1));
638
639 let sequences = batch
640 .column(18)
641 .as_any()
642 .downcast_ref::<UInt64Array>()
643 .unwrap();
644 assert!(sequences.is_null(0));
645 assert_eq!(9, sequences.value(1));
646
647 let origin_region_ids = batch
648 .column(19)
649 .as_any()
650 .downcast_ref::<UInt64Array>()
651 .unwrap();
652 assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
653 assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
654
655 let node_ids = batch
656 .column(20)
657 .as_any()
658 .downcast_ref::<UInt64Array>()
659 .unwrap();
660 assert_eq!(1, node_ids.value(0));
661 assert!(node_ids.is_null(1));
662
663 let visible = batch
664 .column(21)
665 .as_any()
666 .downcast_ref::<BooleanArray>()
667 .unwrap();
668 assert!(!visible.value(0));
669 assert!(visible.value(1));
670 }
671
672 #[test]
673 fn test_sst_entry_storage_to_record_batch() {
674 let entries = vec![
675 StorageSstEntry {
676 file_path: "/s1".to_string(),
677 file_size: None,
678 last_modified_ms: None,
679 node_id: Some(1),
680 },
681 StorageSstEntry {
682 file_path: "/s2".to_string(),
683 file_size: Some(123),
684 last_modified_ms: Some(Timestamp::new_millisecond(456)),
685 node_id: None,
686 },
687 ];
688
689 let schema = StorageSstEntry::schema();
690 let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
691
692 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
693 assert_eq!(2, batch.num_rows());
694
695 let file_paths = batch
696 .column(0)
697 .as_any()
698 .downcast_ref::<StringArray>()
699 .unwrap();
700 assert_eq!("/s1", file_paths.value(0));
701 assert_eq!("/s2", file_paths.value(1));
702
703 let file_sizes = batch
704 .column(1)
705 .as_any()
706 .downcast_ref::<UInt64Array>()
707 .unwrap();
708 assert!(file_sizes.is_null(0));
709 assert_eq!(123, file_sizes.value(1));
710
711 let last_modified = batch
712 .column(2)
713 .as_any()
714 .downcast_ref::<TimestampMillisecondArray>()
715 .unwrap();
716 assert!(last_modified.is_null(0));
717 assert_eq!(456, last_modified.value(1));
718
719 let node_ids = batch
720 .column(3)
721 .as_any()
722 .downcast_ref::<UInt64Array>()
723 .unwrap();
724 assert_eq!(1, node_ids.value(0));
725 assert!(node_ids.is_null(1));
726 }
727
728 #[test]
729 fn test_puffin_index_meta_to_record_batch() {
730 let entries = vec![
731 PuffinIndexMetaEntry {
732 table_dir: "table1".to_string(),
733 index_file_path: "index1".to_string(),
734 region_id: RegionId::with_group_and_seq(10, 0, 20),
735 table_id: 10,
736 region_number: 20,
737 region_group: 0,
738 region_sequence: 20,
739 file_id: "file1".to_string(),
740 index_file_size: Some(1024),
741 index_type: "bloom_filter".to_string(),
742 target_type: "column".to_string(),
743 target_key: "1".to_string(),
744 target_json: "{\"column\":1}".to_string(),
745 blob_size: 256,
746 meta_json: Some("{\"bloom\":{}}".to_string()),
747 node_id: Some(42),
748 },
749 PuffinIndexMetaEntry {
750 table_dir: "table2".to_string(),
751 index_file_path: "index2".to_string(),
752 region_id: RegionId::with_group_and_seq(11, 0, 21),
753 table_id: 11,
754 region_number: 21,
755 region_group: 0,
756 region_sequence: 21,
757 file_id: "file2".to_string(),
758 index_file_size: None,
759 index_type: "inverted".to_string(),
760 target_type: "unknown".to_string(),
761 target_key: "legacy".to_string(),
762 target_json: "{}".to_string(),
763 blob_size: 0,
764 meta_json: None,
765 node_id: None,
766 },
767 ];
768
769 let schema = PuffinIndexMetaEntry::schema();
770 let batch = PuffinIndexMetaEntry::to_record_batch(&entries).unwrap();
771
772 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
773 assert_eq!(2, batch.num_rows());
774
775 let table_dirs = batch
776 .column(0)
777 .as_any()
778 .downcast_ref::<StringArray>()
779 .unwrap();
780 assert_eq!("table1", table_dirs.value(0));
781 assert_eq!("table2", table_dirs.value(1));
782
783 let index_file_paths = batch
784 .column(1)
785 .as_any()
786 .downcast_ref::<StringArray>()
787 .unwrap();
788 assert_eq!("index1", index_file_paths.value(0));
789 assert_eq!("index2", index_file_paths.value(1));
790
791 let region_ids = batch
792 .column(2)
793 .as_any()
794 .downcast_ref::<UInt64Array>()
795 .unwrap();
796 assert_eq!(
797 RegionId::with_group_and_seq(10, 0, 20).as_u64(),
798 region_ids.value(0)
799 );
800 assert_eq!(
801 RegionId::with_group_and_seq(11, 0, 21).as_u64(),
802 region_ids.value(1)
803 );
804
805 let table_ids = batch
806 .column(3)
807 .as_any()
808 .downcast_ref::<UInt32Array>()
809 .unwrap();
810 assert_eq!(10, table_ids.value(0));
811 assert_eq!(11, table_ids.value(1));
812
813 let region_numbers = batch
814 .column(4)
815 .as_any()
816 .downcast_ref::<UInt32Array>()
817 .unwrap();
818 assert_eq!(20, region_numbers.value(0));
819 assert_eq!(21, region_numbers.value(1));
820
821 let region_groups = batch
822 .column(5)
823 .as_any()
824 .downcast_ref::<UInt8Array>()
825 .unwrap();
826 assert_eq!(0, region_groups.value(0));
827 assert_eq!(0, region_groups.value(1));
828
829 let region_sequences = batch
830 .column(6)
831 .as_any()
832 .downcast_ref::<UInt32Array>()
833 .unwrap();
834 assert_eq!(20, region_sequences.value(0));
835 assert_eq!(21, region_sequences.value(1));
836
837 let file_ids = batch
838 .column(7)
839 .as_any()
840 .downcast_ref::<StringArray>()
841 .unwrap();
842 assert_eq!("file1", file_ids.value(0));
843 assert_eq!("file2", file_ids.value(1));
844
845 let index_file_sizes = batch
846 .column(8)
847 .as_any()
848 .downcast_ref::<UInt64Array>()
849 .unwrap();
850 assert_eq!(1024, index_file_sizes.value(0));
851 assert!(index_file_sizes.is_null(1));
852
853 let index_types = batch
854 .column(9)
855 .as_any()
856 .downcast_ref::<StringArray>()
857 .unwrap();
858 assert_eq!("bloom_filter", index_types.value(0));
859 assert_eq!("inverted", index_types.value(1));
860
861 let target_types = batch
862 .column(10)
863 .as_any()
864 .downcast_ref::<StringArray>()
865 .unwrap();
866 assert_eq!("column", target_types.value(0));
867 assert_eq!("unknown", target_types.value(1));
868
869 let target_keys = batch
870 .column(11)
871 .as_any()
872 .downcast_ref::<StringArray>()
873 .unwrap();
874 assert_eq!("1", target_keys.value(0));
875 assert_eq!("legacy", target_keys.value(1));
876
877 let target_json = batch
878 .column(12)
879 .as_any()
880 .downcast_ref::<StringArray>()
881 .unwrap();
882 assert_eq!("{\"column\":1}", target_json.value(0));
883 assert_eq!("{}", target_json.value(1));
884
885 let blob_sizes = batch
886 .column(13)
887 .as_any()
888 .downcast_ref::<UInt64Array>()
889 .unwrap();
890 assert_eq!(256, blob_sizes.value(0));
891 assert_eq!(0, blob_sizes.value(1));
892
893 let meta_jsons = batch
894 .column(14)
895 .as_any()
896 .downcast_ref::<StringArray>()
897 .unwrap();
898 assert_eq!("{\"bloom\":{}}", meta_jsons.value(0));
899 assert!(meta_jsons.is_null(1));
900
901 let node_ids = batch
902 .column(15)
903 .as_any()
904 .downcast_ref::<UInt64Array>()
905 .unwrap();
906 assert_eq!(42, node_ids.value(0));
907 assert!(node_ids.is_null(1));
908 }
909
910 #[test]
911 fn test_manifest_build_plan() {
912 let request = ScanRequest {
914 projection: Some(vec![0, 1, 2]),
915 filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
916 limit: Some(5),
917 ..Default::default()
918 };
919
920 let plan = ManifestSstEntry::build_plan(request).unwrap();
921
922 let (scan, has_filter, has_limit) = extract_scan(&plan);
925
926 assert!(has_filter);
927 assert!(has_limit);
928 assert_eq!(
929 scan.table_name,
930 TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
931 );
932 assert_eq!(scan.projection, Some(vec![0, 1, 2]));
933
934 let fields = scan.projected_schema.fields();
936 assert_eq!(fields.len(), 3);
937 assert_eq!(fields[0].name(), "table_dir");
938 assert_eq!(fields[1].name(), "region_id");
939 assert_eq!(fields[2].name(), "table_id");
940 }
941
942 #[test]
943 fn test_storage_build_plan() {
944 let request = ScanRequest {
945 projection: Some(vec![0, 2]),
946 filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
947 limit: Some(1),
948 ..Default::default()
949 };
950
951 let plan = StorageSstEntry::build_plan(request).unwrap();
952 let (scan, has_filter, has_limit) = extract_scan(&plan);
953 assert!(has_filter);
954 assert!(has_limit);
955 assert_eq!(
956 scan.table_name,
957 TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
958 );
959 assert_eq!(scan.projection, Some(vec![0, 2]));
960
961 let fields = scan.projected_schema.fields();
962 assert_eq!(fields.len(), 2);
963 assert_eq!(fields[0].name(), "file_path");
964 assert_eq!(fields[1].name(), "last_modified_ms");
965 }
966
967 fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
969 use datafusion_expr::logical_plan::Limit;
970
971 match plan {
972 LogicalPlan::Filter(f) => {
973 let (scan, _, has_limit) = extract_scan(&f.input);
974 (scan, true, has_limit)
975 }
976 LogicalPlan::Limit(Limit { input, .. }) => {
977 let (scan, has_filter, _) = extract_scan(input);
978 (scan, has_filter, true)
979 }
980 LogicalPlan::TableScan(scan) => (scan, false, false),
981 other => panic!("unexpected plan: {other:?}"),
982 }
983 }
984}