1use std::sync::Arc;
16
17use bytes::Bytes;
18use common_recordbatch::DfRecordBatch;
19use common_time::Timestamp;
20use common_time::timestamp::TimeUnit;
21use datafusion_common::DataFusionError;
22use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
23use datatypes::arrow::array::{
24 ArrayRef, BinaryArray, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray,
25 UInt8Array, UInt32Array, UInt64Array,
26};
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow_array::StringArray;
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use serde::{Deserialize, Serialize};
31
32use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
33
34pub const PUFFIN_INDEX_TYPE_BLOOM_FILTER: &str = "bloom_filter";
36pub const PUFFIN_INDEX_TYPE_FULLTEXT_BLOOM: &str = "fulltext_bloom";
38pub const PUFFIN_INDEX_TYPE_FULLTEXT_TANTIVY: &str = "fulltext_tantivy";
40pub const PUFFIN_INDEX_TYPE_INVERTED: &str = "inverted";
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
45pub struct ManifestSstEntry {
46 pub table_dir: String,
48 pub region_id: RegionId,
50 pub table_id: TableId,
52 pub region_number: RegionNumber,
54 pub region_group: RegionGroup,
56 pub region_sequence: RegionSeq,
58 pub file_id: String,
60 pub index_version: u64,
62 pub level: u8,
64 pub file_path: String,
66 pub file_size: u64,
68 pub index_file_path: Option<String>,
70 pub index_file_size: Option<u64>,
72 pub num_rows: u64,
74 pub num_row_groups: u64,
76 pub num_series: Option<u64>,
78 pub min_ts: Timestamp,
80 pub max_ts: Timestamp,
82 pub sequence: Option<u64>,
84 pub origin_region_id: RegionId,
86 pub node_id: Option<u64>,
88 pub visible: bool,
90 pub primary_key_min: Option<Bytes>,
92 pub primary_key_max: Option<Bytes>,
94}
95
96impl ManifestSstEntry {
97 pub fn schema() -> SchemaRef {
99 use datatypes::prelude::ConcreteDataType as Ty;
100 Arc::new(Schema::new(vec![
101 ColumnSchema::new("table_dir", Ty::string_datatype(), false),
102 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
103 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
104 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
105 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
106 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
107 ColumnSchema::new("file_id", Ty::string_datatype(), false),
108 ColumnSchema::new("index_version", Ty::uint64_datatype(), false),
109 ColumnSchema::new("level", Ty::uint8_datatype(), false),
110 ColumnSchema::new("file_path", Ty::string_datatype(), false),
111 ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
112 ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
113 ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
114 ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
115 ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
116 ColumnSchema::new("num_series", Ty::uint64_datatype(), true),
117 ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
118 ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
119 ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
120 ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
121 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
122 ColumnSchema::new("visible", Ty::boolean_datatype(), false),
123 ColumnSchema::new("primary_key_min", Ty::binary_datatype(), true),
124 ColumnSchema::new("primary_key_max", Ty::binary_datatype(), true),
125 ]))
126 }
127
128 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
130 let schema = Self::schema();
131 let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
132 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
133 let table_ids = entries.iter().map(|e| e.table_id);
134 let region_numbers = entries.iter().map(|e| e.region_number);
135 let region_groups = entries.iter().map(|e| e.region_group);
136 let region_sequences = entries.iter().map(|e| e.region_sequence);
137 let file_ids = entries.iter().map(|e| e.file_id.as_str());
138 let index_versions = entries.iter().map(|e| e.index_version);
139 let levels = entries.iter().map(|e| e.level);
140 let file_paths = entries.iter().map(|e| e.file_path.as_str());
141 let file_sizes = entries.iter().map(|e| e.file_size);
142 let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
143 let index_file_sizes = entries.iter().map(|e| e.index_file_size);
144 let num_rows = entries.iter().map(|e| e.num_rows);
145 let num_row_groups = entries.iter().map(|e| e.num_row_groups);
146 let num_series = entries.iter().map(|e| e.num_series);
147 let min_ts = entries.iter().map(|e| {
148 e.min_ts
149 .convert_to(TimeUnit::Nanosecond)
150 .map(|ts| ts.value())
151 });
152 let max_ts = entries.iter().map(|e| {
153 e.max_ts
154 .convert_to(TimeUnit::Nanosecond)
155 .map(|ts| ts.value())
156 });
157 let sequences = entries.iter().map(|e| e.sequence);
158 let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
159 let node_ids = entries.iter().map(|e| e.node_id);
160 let visible_flags = entries.iter().map(|e| Some(e.visible));
161 let primary_key_min = entries.iter().map(|e| e.primary_key_min.as_deref());
162 let primary_key_max = entries.iter().map(|e| e.primary_key_max.as_deref());
163
164 let columns: Vec<ArrayRef> = vec![
165 Arc::new(StringArray::from_iter_values(table_dirs)),
166 Arc::new(UInt64Array::from_iter_values(region_ids)),
167 Arc::new(UInt32Array::from_iter_values(table_ids)),
168 Arc::new(UInt32Array::from_iter_values(region_numbers)),
169 Arc::new(UInt8Array::from_iter_values(region_groups)),
170 Arc::new(UInt32Array::from_iter_values(region_sequences)),
171 Arc::new(StringArray::from_iter_values(file_ids)),
172 Arc::new(UInt64Array::from_iter(index_versions)),
173 Arc::new(UInt8Array::from_iter_values(levels)),
174 Arc::new(StringArray::from_iter_values(file_paths)),
175 Arc::new(UInt64Array::from_iter_values(file_sizes)),
176 Arc::new(StringArray::from_iter(index_file_paths)),
177 Arc::new(UInt64Array::from_iter(index_file_sizes)),
178 Arc::new(UInt64Array::from_iter_values(num_rows)),
179 Arc::new(UInt64Array::from_iter_values(num_row_groups)),
180 Arc::new(UInt64Array::from_iter(num_series)),
181 Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
182 Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
183 Arc::new(UInt64Array::from_iter(sequences)),
184 Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
185 Arc::new(UInt64Array::from_iter(node_ids)),
186 Arc::new(BooleanArray::from_iter(visible_flags)),
187 Arc::new(BinaryArray::from_iter(primary_key_min)),
188 Arc::new(BinaryArray::from_iter(primary_key_max)),
189 ];
190
191 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
192 }
193
194 pub fn reserved_table_name_for_inspection() -> &'static str {
200 "__inspect/__mito/__sst_manifest"
201 }
202
203 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
205 build_plan_helper(
206 scan_request,
207 Self::reserved_table_name_for_inspection(),
208 Self::schema(),
209 )
210 }
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
215pub struct StorageSstEntry {
216 pub file_path: String,
218 pub file_size: Option<u64>,
220 pub last_modified_ms: Option<Timestamp>,
222 pub node_id: Option<u64>,
224}
225
226impl StorageSstEntry {
227 pub fn schema() -> SchemaRef {
229 use datatypes::prelude::ConcreteDataType as Ty;
230 Arc::new(Schema::new(vec![
231 ColumnSchema::new("file_path", Ty::string_datatype(), false),
232 ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
233 ColumnSchema::new(
234 "last_modified_ms",
235 Ty::timestamp_millisecond_datatype(),
236 true,
237 ),
238 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
239 ]))
240 }
241
242 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
244 let schema = Self::schema();
245 let file_paths = entries.iter().map(|e| e.file_path.as_str());
246 let file_sizes = entries.iter().map(|e| e.file_size);
247 let last_modified_ms = entries.iter().map(|e| {
248 e.last_modified_ms
249 .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
250 });
251 let node_ids = entries.iter().map(|e| e.node_id);
252
253 let columns: Vec<ArrayRef> = vec![
254 Arc::new(StringArray::from_iter_values(file_paths)),
255 Arc::new(UInt64Array::from_iter(file_sizes)),
256 Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
257 Arc::new(UInt64Array::from_iter(node_ids)),
258 ];
259
260 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
261 }
262
263 pub fn reserved_table_name_for_inspection() -> &'static str {
269 "__inspect/__mito/__sst_storage"
270 }
271
272 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
274 build_plan_helper(
275 scan_request,
276 Self::reserved_table_name_for_inspection(),
277 Self::schema(),
278 )
279 }
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
284pub struct PuffinIndexMetaEntry {
285 pub table_dir: String,
287 pub index_file_path: String,
289 pub region_id: RegionId,
291 pub table_id: TableId,
293 pub region_number: RegionNumber,
295 pub region_group: RegionGroup,
297 pub region_sequence: RegionSeq,
299 pub file_id: String,
301 pub index_file_size: Option<u64>,
303 pub index_type: String,
305 pub target_type: String,
307 pub target_key: String,
309 pub target_json: String,
311 pub blob_size: u64,
313 pub meta_json: Option<String>,
315 pub node_id: Option<u64>,
317}
318
319impl PuffinIndexMetaEntry {
320 pub fn schema() -> SchemaRef {
322 use datatypes::prelude::ConcreteDataType as Ty;
323 Arc::new(Schema::new(vec![
324 ColumnSchema::new("table_dir", Ty::string_datatype(), false),
325 ColumnSchema::new("index_file_path", Ty::string_datatype(), false),
326 ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
327 ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
328 ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
329 ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
330 ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
331 ColumnSchema::new("file_id", Ty::string_datatype(), false),
332 ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
333 ColumnSchema::new("index_type", Ty::string_datatype(), false),
334 ColumnSchema::new("target_type", Ty::string_datatype(), false),
335 ColumnSchema::new("target_key", Ty::string_datatype(), false),
336 ColumnSchema::new("target_json", Ty::string_datatype(), false),
337 ColumnSchema::new("blob_size", Ty::uint64_datatype(), false),
338 ColumnSchema::new("meta_json", Ty::string_datatype(), true),
339 ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
340 ]))
341 }
342
343 pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
345 let schema = Self::schema();
346 let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
347 let index_file_paths = entries.iter().map(|e| e.index_file_path.as_str());
348 let region_ids = entries.iter().map(|e| e.region_id.as_u64());
349 let table_ids = entries.iter().map(|e| e.table_id);
350 let region_numbers = entries.iter().map(|e| e.region_number);
351 let region_groups = entries.iter().map(|e| e.region_group);
352 let region_sequences = entries.iter().map(|e| e.region_sequence);
353 let file_ids = entries.iter().map(|e| e.file_id.as_str());
354 let index_file_sizes = entries.iter().map(|e| e.index_file_size);
355 let index_types = entries.iter().map(|e| e.index_type.as_str());
356 let target_types = entries.iter().map(|e| e.target_type.as_str());
357 let target_keys = entries.iter().map(|e| e.target_key.as_str());
358 let target_jsons = entries.iter().map(|e| e.target_json.as_str());
359 let blob_sizes = entries.iter().map(|e| e.blob_size);
360 let meta_jsons = entries.iter().map(|e| e.meta_json.as_deref());
361 let node_ids = entries.iter().map(|e| e.node_id);
362
363 let columns: Vec<ArrayRef> = vec![
364 Arc::new(StringArray::from_iter_values(table_dirs)),
365 Arc::new(StringArray::from_iter_values(index_file_paths)),
366 Arc::new(UInt64Array::from_iter_values(region_ids)),
367 Arc::new(UInt32Array::from_iter_values(table_ids)),
368 Arc::new(UInt32Array::from_iter_values(region_numbers)),
369 Arc::new(UInt8Array::from_iter_values(region_groups)),
370 Arc::new(UInt32Array::from_iter_values(region_sequences)),
371 Arc::new(StringArray::from_iter_values(file_ids)),
372 Arc::new(UInt64Array::from_iter(index_file_sizes)),
373 Arc::new(StringArray::from_iter_values(index_types)),
374 Arc::new(StringArray::from_iter_values(target_types)),
375 Arc::new(StringArray::from_iter_values(target_keys)),
376 Arc::new(StringArray::from_iter_values(target_jsons)),
377 Arc::new(UInt64Array::from_iter_values(blob_sizes)),
378 Arc::new(StringArray::from_iter(meta_jsons)),
379 Arc::new(UInt64Array::from_iter(node_ids)),
380 ];
381
382 DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
383 }
384
385 pub fn reserved_table_name_for_inspection() -> &'static str {
387 "__inspect/__mito/__puffin_index_meta"
388 }
389
390 pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
392 build_plan_helper(
393 scan_request,
394 Self::reserved_table_name_for_inspection(),
395 Self::schema(),
396 )
397 }
398}
399
400fn build_plan_helper(
401 scan_request: ScanRequest,
402 table_name: &str,
403 schema: SchemaRef,
404) -> Result<LogicalPlan, DataFusionError> {
405 let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
406
407 let projection = scan_request.projection_input.map(|input| input.projection);
408 let mut builder = LogicalPlanBuilder::scan(table_name, Arc::new(table_source), projection)?;
409
410 for filter in scan_request.filters {
411 builder = builder.filter(filter)?;
412 }
413
414 if let Some(limit) = scan_request.limit {
415 builder = builder.limit(0, Some(limit))?;
416 }
417
418 builder.build()
419}
420
421#[cfg(test)]
422mod tests {
423 use datafusion_common::TableReference;
424 use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
425 use datatypes::arrow::array::{
426 Array, BinaryArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
427 UInt32Array, UInt64Array,
428 };
429 use datatypes::arrow_array::StringArray;
430
431 use super::*;
432
433 #[test]
434 fn test_sst_entry_manifest_to_record_batch() {
435 let table_id1: TableId = 1;
437 let region_group1: RegionGroup = 2;
438 let region_seq1: RegionSeq = 3;
439 let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
440 let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
441
442 let table_id2: TableId = 5;
443 let region_group2: RegionGroup = 1;
444 let region_seq2: RegionSeq = 42;
445 let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
446 let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
447
448 let entries = vec![
449 ManifestSstEntry {
450 table_dir: "tdir1".to_string(),
451 region_id: region_id1,
452 table_id: table_id1,
453 region_number: region_number1,
454 region_group: region_group1,
455 region_sequence: region_seq1,
456 file_id: "f1".to_string(),
457 index_version: 0,
458 level: 1,
459 file_path: "/p1".to_string(),
460 file_size: 100,
461 index_file_path: None,
462 index_file_size: None,
463 num_rows: 10,
464 num_row_groups: 2,
465 num_series: Some(5),
466 min_ts: Timestamp::new_millisecond(1000), max_ts: Timestamp::new_second(2), sequence: None,
469 origin_region_id: region_id1,
470 node_id: Some(1),
471 visible: false,
472 primary_key_min: Some(Bytes::from_static(b"aaa")),
473 primary_key_max: Some(Bytes::from_static(b"zzz")),
474 },
475 ManifestSstEntry {
476 table_dir: "tdir2".to_string(),
477 region_id: region_id2,
478 table_id: table_id2,
479 region_number: region_number2,
480 region_group: region_group2,
481 region_sequence: region_seq2,
482 file_id: "f2".to_string(),
483 index_version: 1,
484 level: 3,
485 file_path: "/p2".to_string(),
486 file_size: 200,
487 index_file_path: Some("idx".to_string()),
488 index_file_size: Some(11),
489 num_rows: 20,
490 num_row_groups: 4,
491 num_series: None,
492 min_ts: Timestamp::new_nanosecond(5), max_ts: Timestamp::new_microsecond(2000), sequence: Some(9),
495 origin_region_id: region_id2,
496 node_id: None,
497 visible: true,
498 primary_key_min: None,
499 primary_key_max: None,
500 },
501 ];
502
503 let schema = ManifestSstEntry::schema();
504 let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
505
506 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
508 assert_eq!(2, batch.num_rows());
509 for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
510 assert_eq!(f.name(), batch.schema().field(i).name());
511 assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
512 assert_eq!(f.data_type(), batch.schema().field(i).data_type());
513 }
514
515 let table_dirs = batch
517 .column(0)
518 .as_any()
519 .downcast_ref::<StringArray>()
520 .unwrap();
521 assert_eq!("tdir1", table_dirs.value(0));
522 assert_eq!("tdir2", table_dirs.value(1));
523
524 let region_ids = batch
525 .column(1)
526 .as_any()
527 .downcast_ref::<UInt64Array>()
528 .unwrap();
529 assert_eq!(region_id1.as_u64(), region_ids.value(0));
530 assert_eq!(region_id2.as_u64(), region_ids.value(1));
531
532 let table_ids = batch
533 .column(2)
534 .as_any()
535 .downcast_ref::<UInt32Array>()
536 .unwrap();
537 assert_eq!(table_id1, table_ids.value(0));
538 assert_eq!(table_id2, table_ids.value(1));
539
540 let region_numbers = batch
541 .column(3)
542 .as_any()
543 .downcast_ref::<UInt32Array>()
544 .unwrap();
545 assert_eq!(region_number1, region_numbers.value(0));
546 assert_eq!(region_number2, region_numbers.value(1));
547
548 let region_groups = batch
549 .column(4)
550 .as_any()
551 .downcast_ref::<UInt8Array>()
552 .unwrap();
553 assert_eq!(region_group1, region_groups.value(0));
554 assert_eq!(region_group2, region_groups.value(1));
555
556 let region_sequences = batch
557 .column(5)
558 .as_any()
559 .downcast_ref::<UInt32Array>()
560 .unwrap();
561 assert_eq!(region_seq1, region_sequences.value(0));
562 assert_eq!(region_seq2, region_sequences.value(1));
563
564 let file_ids = batch
565 .column(6)
566 .as_any()
567 .downcast_ref::<StringArray>()
568 .unwrap();
569 assert_eq!("f1", file_ids.value(0));
570 assert_eq!("f2", file_ids.value(1));
571
572 let index_versions = batch
573 .column(7)
574 .as_any()
575 .downcast_ref::<UInt64Array>()
576 .unwrap();
577 assert_eq!(0, index_versions.value(0));
578 assert_eq!(1, index_versions.value(1));
579
580 let levels = batch
581 .column(8)
582 .as_any()
583 .downcast_ref::<UInt8Array>()
584 .unwrap();
585 assert_eq!(1, levels.value(0));
586 assert_eq!(3, levels.value(1));
587
588 let file_paths = batch
589 .column(9)
590 .as_any()
591 .downcast_ref::<StringArray>()
592 .unwrap();
593 assert_eq!("/p1", file_paths.value(0));
594 assert_eq!("/p2", file_paths.value(1));
595
596 let file_sizes = batch
597 .column(10)
598 .as_any()
599 .downcast_ref::<UInt64Array>()
600 .unwrap();
601 assert_eq!(100, file_sizes.value(0));
602 assert_eq!(200, file_sizes.value(1));
603
604 let index_file_paths = batch
605 .column(11)
606 .as_any()
607 .downcast_ref::<StringArray>()
608 .unwrap();
609 assert!(index_file_paths.is_null(0));
610 assert_eq!("idx", index_file_paths.value(1));
611
612 let index_file_sizes = batch
613 .column(12)
614 .as_any()
615 .downcast_ref::<UInt64Array>()
616 .unwrap();
617 assert!(index_file_sizes.is_null(0));
618 assert_eq!(11, index_file_sizes.value(1));
619
620 let num_rows = batch
621 .column(13)
622 .as_any()
623 .downcast_ref::<UInt64Array>()
624 .unwrap();
625 assert_eq!(10, num_rows.value(0));
626 assert_eq!(20, num_rows.value(1));
627
628 let num_row_groups = batch
629 .column(14)
630 .as_any()
631 .downcast_ref::<UInt64Array>()
632 .unwrap();
633 assert_eq!(2, num_row_groups.value(0));
634 assert_eq!(4, num_row_groups.value(1));
635
636 let num_series = batch
637 .column(15)
638 .as_any()
639 .downcast_ref::<UInt64Array>()
640 .unwrap();
641 assert_eq!(5, num_series.value(0));
642 assert!(num_series.is_null(1));
643
644 let min_ts = batch
645 .column(16)
646 .as_any()
647 .downcast_ref::<TimestampNanosecondArray>()
648 .unwrap();
649 assert_eq!(1_000_000_000, min_ts.value(0));
650 assert_eq!(5, min_ts.value(1));
651
652 let max_ts = batch
653 .column(17)
654 .as_any()
655 .downcast_ref::<TimestampNanosecondArray>()
656 .unwrap();
657 assert_eq!(2_000_000_000, max_ts.value(0));
658 assert_eq!(2_000_000, max_ts.value(1));
659
660 let sequences = batch
661 .column(18)
662 .as_any()
663 .downcast_ref::<UInt64Array>()
664 .unwrap();
665 assert!(sequences.is_null(0));
666 assert_eq!(9, sequences.value(1));
667
668 let origin_region_ids = batch
669 .column(19)
670 .as_any()
671 .downcast_ref::<UInt64Array>()
672 .unwrap();
673 assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
674 assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
675
676 let node_ids = batch
677 .column(20)
678 .as_any()
679 .downcast_ref::<UInt64Array>()
680 .unwrap();
681 assert_eq!(1, node_ids.value(0));
682 assert!(node_ids.is_null(1));
683
684 let visible = batch
685 .column(21)
686 .as_any()
687 .downcast_ref::<BooleanArray>()
688 .unwrap();
689 assert!(!visible.value(0));
690 assert!(visible.value(1));
691
692 let primary_key_min = batch
693 .column(22)
694 .as_any()
695 .downcast_ref::<BinaryArray>()
696 .unwrap();
697 assert_eq!(b"aaa", primary_key_min.value(0));
698 assert!(primary_key_min.is_null(1));
699
700 let primary_key_max = batch
701 .column(23)
702 .as_any()
703 .downcast_ref::<BinaryArray>()
704 .unwrap();
705 assert_eq!(b"zzz", primary_key_max.value(0));
706 assert!(primary_key_max.is_null(1));
707 }
708
709 #[test]
710 fn test_sst_entry_storage_to_record_batch() {
711 let entries = vec![
712 StorageSstEntry {
713 file_path: "/s1".to_string(),
714 file_size: None,
715 last_modified_ms: None,
716 node_id: Some(1),
717 },
718 StorageSstEntry {
719 file_path: "/s2".to_string(),
720 file_size: Some(123),
721 last_modified_ms: Some(Timestamp::new_millisecond(456)),
722 node_id: None,
723 },
724 ];
725
726 let schema = StorageSstEntry::schema();
727 let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
728
729 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
730 assert_eq!(2, batch.num_rows());
731
732 let file_paths = batch
733 .column(0)
734 .as_any()
735 .downcast_ref::<StringArray>()
736 .unwrap();
737 assert_eq!("/s1", file_paths.value(0));
738 assert_eq!("/s2", file_paths.value(1));
739
740 let file_sizes = batch
741 .column(1)
742 .as_any()
743 .downcast_ref::<UInt64Array>()
744 .unwrap();
745 assert!(file_sizes.is_null(0));
746 assert_eq!(123, file_sizes.value(1));
747
748 let last_modified = batch
749 .column(2)
750 .as_any()
751 .downcast_ref::<TimestampMillisecondArray>()
752 .unwrap();
753 assert!(last_modified.is_null(0));
754 assert_eq!(456, last_modified.value(1));
755
756 let node_ids = batch
757 .column(3)
758 .as_any()
759 .downcast_ref::<UInt64Array>()
760 .unwrap();
761 assert_eq!(1, node_ids.value(0));
762 assert!(node_ids.is_null(1));
763 }
764
765 #[test]
766 fn test_puffin_index_meta_to_record_batch() {
767 let entries = vec![
768 PuffinIndexMetaEntry {
769 table_dir: "table1".to_string(),
770 index_file_path: "index1".to_string(),
771 region_id: RegionId::with_group_and_seq(10, 0, 20),
772 table_id: 10,
773 region_number: 20,
774 region_group: 0,
775 region_sequence: 20,
776 file_id: "file1".to_string(),
777 index_file_size: Some(1024),
778 index_type: "bloom_filter".to_string(),
779 target_type: "column".to_string(),
780 target_key: "1".to_string(),
781 target_json: "{\"column\":1}".to_string(),
782 blob_size: 256,
783 meta_json: Some("{\"bloom\":{}}".to_string()),
784 node_id: Some(42),
785 },
786 PuffinIndexMetaEntry {
787 table_dir: "table2".to_string(),
788 index_file_path: "index2".to_string(),
789 region_id: RegionId::with_group_and_seq(11, 0, 21),
790 table_id: 11,
791 region_number: 21,
792 region_group: 0,
793 region_sequence: 21,
794 file_id: "file2".to_string(),
795 index_file_size: None,
796 index_type: "inverted".to_string(),
797 target_type: "unknown".to_string(),
798 target_key: "legacy".to_string(),
799 target_json: "{}".to_string(),
800 blob_size: 0,
801 meta_json: None,
802 node_id: None,
803 },
804 ];
805
806 let schema = PuffinIndexMetaEntry::schema();
807 let batch = PuffinIndexMetaEntry::to_record_batch(&entries).unwrap();
808
809 assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
810 assert_eq!(2, batch.num_rows());
811
812 let table_dirs = batch
813 .column(0)
814 .as_any()
815 .downcast_ref::<StringArray>()
816 .unwrap();
817 assert_eq!("table1", table_dirs.value(0));
818 assert_eq!("table2", table_dirs.value(1));
819
820 let index_file_paths = batch
821 .column(1)
822 .as_any()
823 .downcast_ref::<StringArray>()
824 .unwrap();
825 assert_eq!("index1", index_file_paths.value(0));
826 assert_eq!("index2", index_file_paths.value(1));
827
828 let region_ids = batch
829 .column(2)
830 .as_any()
831 .downcast_ref::<UInt64Array>()
832 .unwrap();
833 assert_eq!(
834 RegionId::with_group_and_seq(10, 0, 20).as_u64(),
835 region_ids.value(0)
836 );
837 assert_eq!(
838 RegionId::with_group_and_seq(11, 0, 21).as_u64(),
839 region_ids.value(1)
840 );
841
842 let table_ids = batch
843 .column(3)
844 .as_any()
845 .downcast_ref::<UInt32Array>()
846 .unwrap();
847 assert_eq!(10, table_ids.value(0));
848 assert_eq!(11, table_ids.value(1));
849
850 let region_numbers = batch
851 .column(4)
852 .as_any()
853 .downcast_ref::<UInt32Array>()
854 .unwrap();
855 assert_eq!(20, region_numbers.value(0));
856 assert_eq!(21, region_numbers.value(1));
857
858 let region_groups = batch
859 .column(5)
860 .as_any()
861 .downcast_ref::<UInt8Array>()
862 .unwrap();
863 assert_eq!(0, region_groups.value(0));
864 assert_eq!(0, region_groups.value(1));
865
866 let region_sequences = batch
867 .column(6)
868 .as_any()
869 .downcast_ref::<UInt32Array>()
870 .unwrap();
871 assert_eq!(20, region_sequences.value(0));
872 assert_eq!(21, region_sequences.value(1));
873
874 let file_ids = batch
875 .column(7)
876 .as_any()
877 .downcast_ref::<StringArray>()
878 .unwrap();
879 assert_eq!("file1", file_ids.value(0));
880 assert_eq!("file2", file_ids.value(1));
881
882 let index_file_sizes = batch
883 .column(8)
884 .as_any()
885 .downcast_ref::<UInt64Array>()
886 .unwrap();
887 assert_eq!(1024, index_file_sizes.value(0));
888 assert!(index_file_sizes.is_null(1));
889
890 let index_types = batch
891 .column(9)
892 .as_any()
893 .downcast_ref::<StringArray>()
894 .unwrap();
895 assert_eq!("bloom_filter", index_types.value(0));
896 assert_eq!("inverted", index_types.value(1));
897
898 let target_types = batch
899 .column(10)
900 .as_any()
901 .downcast_ref::<StringArray>()
902 .unwrap();
903 assert_eq!("column", target_types.value(0));
904 assert_eq!("unknown", target_types.value(1));
905
906 let target_keys = batch
907 .column(11)
908 .as_any()
909 .downcast_ref::<StringArray>()
910 .unwrap();
911 assert_eq!("1", target_keys.value(0));
912 assert_eq!("legacy", target_keys.value(1));
913
914 let target_json = batch
915 .column(12)
916 .as_any()
917 .downcast_ref::<StringArray>()
918 .unwrap();
919 assert_eq!("{\"column\":1}", target_json.value(0));
920 assert_eq!("{}", target_json.value(1));
921
922 let blob_sizes = batch
923 .column(13)
924 .as_any()
925 .downcast_ref::<UInt64Array>()
926 .unwrap();
927 assert_eq!(256, blob_sizes.value(0));
928 assert_eq!(0, blob_sizes.value(1));
929
930 let meta_jsons = batch
931 .column(14)
932 .as_any()
933 .downcast_ref::<StringArray>()
934 .unwrap();
935 assert_eq!("{\"bloom\":{}}", meta_jsons.value(0));
936 assert!(meta_jsons.is_null(1));
937
938 let node_ids = batch
939 .column(15)
940 .as_any()
941 .downcast_ref::<UInt64Array>()
942 .unwrap();
943 assert_eq!(42, node_ids.value(0));
944 assert!(node_ids.is_null(1));
945 }
946
947 #[test]
948 fn test_manifest_build_plan() {
949 let projection_input = Some(vec![0, 1, 2].into());
951 let request = ScanRequest {
952 projection_input,
953 filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
954 limit: Some(5),
955 ..Default::default()
956 };
957
958 let plan = ManifestSstEntry::build_plan(request).unwrap();
959
960 let (scan, has_filter, has_limit) = extract_scan(&plan);
963
964 assert!(has_filter);
965 assert!(has_limit);
966 assert_eq!(
967 scan.table_name,
968 TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
969 );
970 assert_eq!(scan.projection, Some(vec![0, 1, 2]));
971
972 let fields = scan.projected_schema.fields();
974 assert_eq!(fields.len(), 3);
975 assert_eq!(fields[0].name(), "table_dir");
976 assert_eq!(fields[1].name(), "region_id");
977 assert_eq!(fields[2].name(), "table_id");
978 }
979
980 #[test]
981 fn test_storage_build_plan() {
982 let projection_input = Some(vec![0, 2].into());
983 let request = ScanRequest {
984 projection_input,
985 filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
986 limit: Some(1),
987 ..Default::default()
988 };
989
990 let plan = StorageSstEntry::build_plan(request).unwrap();
991 let (scan, has_filter, has_limit) = extract_scan(&plan);
992 assert!(has_filter);
993 assert!(has_limit);
994 assert_eq!(
995 scan.table_name,
996 TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
997 );
998 assert_eq!(scan.projection, Some(vec![0, 2]));
999
1000 let fields = scan.projected_schema.fields();
1001 assert_eq!(fields.len(), 2);
1002 assert_eq!(fields[0].name(), "file_path");
1003 assert_eq!(fields[1].name(), "last_modified_ms");
1004 }
1005
1006 fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
1008 use datafusion_expr::logical_plan::Limit;
1009
1010 match plan {
1011 LogicalPlan::Filter(f) => {
1012 let (scan, _, has_limit) = extract_scan(&f.input);
1013 (scan, true, has_limit)
1014 }
1015 LogicalPlan::Limit(Limit { input, .. }) => {
1016 let (scan, has_filter, _) = extract_scan(input);
1017 (scan, has_filter, true)
1018 }
1019 LogicalPlan::TableScan(scan) => (scan, false, false),
1020 other => panic!("unexpected plan: {other:?}"),
1021 }
1022 }
1023}