store_api/
sst_entry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use common_time::Timestamp;
19use common_time::timestamp::TimeUnit;
20use datafusion_common::DataFusionError;
21use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
22use datatypes::arrow::array::{
23    ArrayRef, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
24    UInt32Array, UInt64Array,
25};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow_array::StringArray;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use serde::{Deserialize, Serialize};
30
31use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
32
33/// An entry describing a SST file known by the engine's manifest.
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct ManifestSstEntry {
36    /// The table directory this file belongs to.
37    pub table_dir: String,
38    /// The region id of region that refers to the file.
39    pub region_id: RegionId,
40    /// The table id this file belongs to.
41    pub table_id: TableId,
42    /// The region number this file belongs to.
43    pub region_number: RegionNumber,
44    /// The region group this file belongs to.
45    pub region_group: RegionGroup,
46    /// The region sequence this file belongs to.
47    pub region_sequence: RegionSeq,
48    /// Engine-specific file identifier (string form).
49    pub file_id: String,
50    /// Engine-specific index file identifier (string form).
51    pub index_file_id: Option<String>,
52    /// SST level.
53    pub level: u8,
54    /// Full path of the SST file in object store.
55    pub file_path: String,
56    /// File size in bytes.
57    pub file_size: u64,
58    /// Full path of the index file in object store.
59    pub index_file_path: Option<String>,
60    /// File size of the index file in object store.
61    pub index_file_size: Option<u64>,
62    /// Number of rows in the SST.
63    pub num_rows: u64,
64    /// Number of row groups in the SST.
65    pub num_row_groups: u64,
66    /// Number of series in the SST.
67    pub num_series: Option<u64>,
68    /// Min timestamp.
69    pub min_ts: Timestamp,
70    /// Max timestamp.
71    pub max_ts: Timestamp,
72    /// The sequence number associated with this file.
73    pub sequence: Option<u64>,
74    /// The region id of region that creates the file.
75    pub origin_region_id: RegionId,
76    /// The node id fetched from the manifest.
77    pub node_id: Option<u64>,
78    /// Whether this file is visible in current version.
79    pub visible: bool,
80}
81
82impl ManifestSstEntry {
83    /// Returns the schema of the manifest sst entry.
84    pub fn schema() -> SchemaRef {
85        use datatypes::prelude::ConcreteDataType as Ty;
86        Arc::new(Schema::new(vec![
87            ColumnSchema::new("table_dir", Ty::string_datatype(), false),
88            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
89            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
90            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
91            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
92            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
93            ColumnSchema::new("file_id", Ty::string_datatype(), false),
94            ColumnSchema::new("index_file_id", Ty::string_datatype(), true),
95            ColumnSchema::new("level", Ty::uint8_datatype(), false),
96            ColumnSchema::new("file_path", Ty::string_datatype(), false),
97            ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
98            ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
99            ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
100            ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
101            ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
102            ColumnSchema::new("num_series", Ty::uint64_datatype(), true),
103            ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
104            ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
105            ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
106            ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
107            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
108            ColumnSchema::new("visible", Ty::boolean_datatype(), false),
109        ]))
110    }
111
112    /// Converts a list of manifest sst entries to a record batch.
113    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
114        let schema = Self::schema();
115        let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
116        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
117        let table_ids = entries.iter().map(|e| e.table_id);
118        let region_numbers = entries.iter().map(|e| e.region_number);
119        let region_groups = entries.iter().map(|e| e.region_group);
120        let region_sequences = entries.iter().map(|e| e.region_sequence);
121        let file_ids = entries.iter().map(|e| e.file_id.as_str());
122        let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref());
123        let levels = entries.iter().map(|e| e.level);
124        let file_paths = entries.iter().map(|e| e.file_path.as_str());
125        let file_sizes = entries.iter().map(|e| e.file_size);
126        let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
127        let index_file_sizes = entries.iter().map(|e| e.index_file_size);
128        let num_rows = entries.iter().map(|e| e.num_rows);
129        let num_row_groups = entries.iter().map(|e| e.num_row_groups);
130        let num_series = entries.iter().map(|e| e.num_series);
131        let min_ts = entries.iter().map(|e| {
132            e.min_ts
133                .convert_to(TimeUnit::Nanosecond)
134                .map(|ts| ts.value())
135        });
136        let max_ts = entries.iter().map(|e| {
137            e.max_ts
138                .convert_to(TimeUnit::Nanosecond)
139                .map(|ts| ts.value())
140        });
141        let sequences = entries.iter().map(|e| e.sequence);
142        let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
143        let node_ids = entries.iter().map(|e| e.node_id);
144        let visible_flags = entries.iter().map(|e| Some(e.visible));
145
146        let columns: Vec<ArrayRef> = vec![
147            Arc::new(StringArray::from_iter_values(table_dirs)),
148            Arc::new(UInt64Array::from_iter_values(region_ids)),
149            Arc::new(UInt32Array::from_iter_values(table_ids)),
150            Arc::new(UInt32Array::from_iter_values(region_numbers)),
151            Arc::new(UInt8Array::from_iter_values(region_groups)),
152            Arc::new(UInt32Array::from_iter_values(region_sequences)),
153            Arc::new(StringArray::from_iter_values(file_ids)),
154            Arc::new(StringArray::from_iter(index_file_ids)),
155            Arc::new(UInt8Array::from_iter_values(levels)),
156            Arc::new(StringArray::from_iter_values(file_paths)),
157            Arc::new(UInt64Array::from_iter_values(file_sizes)),
158            Arc::new(StringArray::from_iter(index_file_paths)),
159            Arc::new(UInt64Array::from_iter(index_file_sizes)),
160            Arc::new(UInt64Array::from_iter_values(num_rows)),
161            Arc::new(UInt64Array::from_iter_values(num_row_groups)),
162            Arc::new(UInt64Array::from_iter(num_series)),
163            Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
164            Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
165            Arc::new(UInt64Array::from_iter(sequences)),
166            Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
167            Arc::new(UInt64Array::from_iter(node_ids)),
168            Arc::new(BooleanArray::from_iter(visible_flags)),
169        ];
170
171        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
172    }
173
174    /// Reserved internal inspect table name.
175    ///
176    /// This table name is used only for building logical plans on the
177    /// frontend -> datanode path. It is not user-visible and cannot be
178    /// referenced by user queries.
179    pub fn reserved_table_name_for_inspection() -> &'static str {
180        "__inspect/__mito/__sst_manifest"
181    }
182
183    /// Builds a logical plan for scanning the manifest sst entries.
184    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
185        build_plan_helper(
186            scan_request,
187            Self::reserved_table_name_for_inspection(),
188            Self::schema(),
189        )
190    }
191}
192
193/// An entry describing a SST file listed from storage layer directly.
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct StorageSstEntry {
196    /// Full path of the SST file in object store.
197    pub file_path: String,
198    /// File size in bytes.
199    pub file_size: Option<u64>,
200    /// Last modified time in milliseconds since epoch, if available from storage.
201    pub last_modified_ms: Option<Timestamp>,
202    /// The node id fetched from the manifest.
203    pub node_id: Option<u64>,
204}
205
206impl StorageSstEntry {
207    /// Returns the schema of the storage sst entry.
208    pub fn schema() -> SchemaRef {
209        use datatypes::prelude::ConcreteDataType as Ty;
210        Arc::new(Schema::new(vec![
211            ColumnSchema::new("file_path", Ty::string_datatype(), false),
212            ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
213            ColumnSchema::new(
214                "last_modified_ms",
215                Ty::timestamp_millisecond_datatype(),
216                true,
217            ),
218            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
219        ]))
220    }
221
222    /// Converts a list of storage sst entries to a record batch.
223    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
224        let schema = Self::schema();
225        let file_paths = entries.iter().map(|e| e.file_path.as_str());
226        let file_sizes = entries.iter().map(|e| e.file_size);
227        let last_modified_ms = entries.iter().map(|e| {
228            e.last_modified_ms
229                .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
230        });
231        let node_ids = entries.iter().map(|e| e.node_id);
232
233        let columns: Vec<ArrayRef> = vec![
234            Arc::new(StringArray::from_iter_values(file_paths)),
235            Arc::new(UInt64Array::from_iter(file_sizes)),
236            Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
237            Arc::new(UInt64Array::from_iter(node_ids)),
238        ];
239
240        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
241    }
242
243    /// Reserved internal inspect table name.
244    ///
245    /// This table name is used only for building logical plans on the
246    /// frontend -> datanode path. It is not user-visible and cannot be
247    /// referenced by user queries.
248    pub fn reserved_table_name_for_inspection() -> &'static str {
249        "__inspect/__mito/__sst_storage"
250    }
251
252    /// Builds a logical plan for scanning the storage sst entries.
253    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
254        build_plan_helper(
255            scan_request,
256            Self::reserved_table_name_for_inspection(),
257            Self::schema(),
258        )
259    }
260}
261
262/// An entry describing puffin index metadata for inspection.
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
264pub struct PuffinIndexMetaEntry {
265    /// The table directory this index belongs to.
266    pub table_dir: String,
267    /// The full path of the index file in object store.
268    pub index_file_path: String,
269    /// The region id referencing the index file.
270    pub region_id: RegionId,
271    /// The table id referencing the index file.
272    pub table_id: TableId,
273    /// The region number referencing the index file.
274    pub region_number: RegionNumber,
275    /// The region group referencing the index file.
276    pub region_group: RegionGroup,
277    /// The region sequence referencing the index file.
278    pub region_sequence: RegionSeq,
279    /// Engine-specific file identifier (string form).
280    pub file_id: String,
281    /// Size of the index file in object store (if available).
282    pub index_file_size: Option<u64>,
283    /// Logical index type (`bloom_filter`, `fulltext_bloom`, `fulltext_tantivy`, `inverted`).
284    pub index_type: String,
285    /// Target type (`column`, ...).
286    pub target_type: String,
287    /// Encoded target key string.
288    pub target_key: String,
289    /// Structured JSON describing the target.
290    pub target_json: String,
291    /// Size of the blob storing this target.
292    pub blob_size: u64,
293    /// Structured JSON describing index-specific metadata (if available).
294    pub meta_json: Option<String>,
295    /// Node id associated with the index file (if known).
296    pub node_id: Option<u64>,
297}
298
299impl PuffinIndexMetaEntry {
300    /// Returns the schema describing puffin index metadata entries.
301    pub fn schema() -> SchemaRef {
302        use datatypes::prelude::ConcreteDataType as Ty;
303        Arc::new(Schema::new(vec![
304            ColumnSchema::new("table_dir", Ty::string_datatype(), false),
305            ColumnSchema::new("index_file_path", Ty::string_datatype(), false),
306            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
307            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
308            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
309            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
310            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
311            ColumnSchema::new("file_id", Ty::string_datatype(), false),
312            ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
313            ColumnSchema::new("index_type", Ty::string_datatype(), false),
314            ColumnSchema::new("target_type", Ty::string_datatype(), false),
315            ColumnSchema::new("target_key", Ty::string_datatype(), false),
316            ColumnSchema::new("target_json", Ty::string_datatype(), false),
317            ColumnSchema::new("blob_size", Ty::uint64_datatype(), false),
318            ColumnSchema::new("meta_json", Ty::string_datatype(), true),
319            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
320        ]))
321    }
322
323    /// Converts a list of puffin index metadata entries to a record batch.
324    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
325        let schema = Self::schema();
326        let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
327        let index_file_paths = entries.iter().map(|e| e.index_file_path.as_str());
328        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
329        let table_ids = entries.iter().map(|e| e.table_id);
330        let region_numbers = entries.iter().map(|e| e.region_number);
331        let region_groups = entries.iter().map(|e| e.region_group);
332        let region_sequences = entries.iter().map(|e| e.region_sequence);
333        let file_ids = entries.iter().map(|e| e.file_id.as_str());
334        let index_file_sizes = entries.iter().map(|e| e.index_file_size);
335        let index_types = entries.iter().map(|e| e.index_type.as_str());
336        let target_types = entries.iter().map(|e| e.target_type.as_str());
337        let target_keys = entries.iter().map(|e| e.target_key.as_str());
338        let target_jsons = entries.iter().map(|e| e.target_json.as_str());
339        let blob_sizes = entries.iter().map(|e| e.blob_size);
340        let meta_jsons = entries.iter().map(|e| e.meta_json.as_deref());
341        let node_ids = entries.iter().map(|e| e.node_id);
342
343        let columns: Vec<ArrayRef> = vec![
344            Arc::new(StringArray::from_iter_values(table_dirs)),
345            Arc::new(StringArray::from_iter_values(index_file_paths)),
346            Arc::new(UInt64Array::from_iter_values(region_ids)),
347            Arc::new(UInt32Array::from_iter_values(table_ids)),
348            Arc::new(UInt32Array::from_iter_values(region_numbers)),
349            Arc::new(UInt8Array::from_iter_values(region_groups)),
350            Arc::new(UInt32Array::from_iter_values(region_sequences)),
351            Arc::new(StringArray::from_iter_values(file_ids)),
352            Arc::new(UInt64Array::from_iter(index_file_sizes)),
353            Arc::new(StringArray::from_iter_values(index_types)),
354            Arc::new(StringArray::from_iter_values(target_types)),
355            Arc::new(StringArray::from_iter_values(target_keys)),
356            Arc::new(StringArray::from_iter_values(target_jsons)),
357            Arc::new(UInt64Array::from_iter_values(blob_sizes)),
358            Arc::new(StringArray::from_iter(meta_jsons)),
359            Arc::new(UInt64Array::from_iter(node_ids)),
360        ];
361
362        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
363    }
364
365    /// Reserved internal inspect table name for puffin index metadata.
366    pub fn reserved_table_name_for_inspection() -> &'static str {
367        "__inspect/__mito/__puffin_index_meta"
368    }
369
370    /// Builds a logical plan for scanning puffin index metadata entries.
371    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
372        build_plan_helper(
373            scan_request,
374            Self::reserved_table_name_for_inspection(),
375            Self::schema(),
376        )
377    }
378}
379
380fn build_plan_helper(
381    scan_request: ScanRequest,
382    table_name: &str,
383    schema: SchemaRef,
384) -> Result<LogicalPlan, DataFusionError> {
385    let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
386
387    let mut builder = LogicalPlanBuilder::scan(
388        table_name,
389        Arc::new(table_source),
390        scan_request.projection.clone(),
391    )?;
392
393    for filter in scan_request.filters {
394        builder = builder.filter(filter)?;
395    }
396
397    if let Some(limit) = scan_request.limit {
398        builder = builder.limit(0, Some(limit))?;
399    }
400
401    builder.build()
402}
403
404#[cfg(test)]
405mod tests {
406    use datafusion_common::TableReference;
407    use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
408    use datatypes::arrow::array::{
409        Array, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
410        UInt64Array,
411    };
412    use datatypes::arrow_array::StringArray;
413
414    use super::*;
415
416    #[test]
417    fn test_sst_entry_manifest_to_record_batch() {
418        // Prepare entries
419        let table_id1: TableId = 1;
420        let region_group1: RegionGroup = 2;
421        let region_seq1: RegionSeq = 3;
422        let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
423        let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
424
425        let table_id2: TableId = 5;
426        let region_group2: RegionGroup = 1;
427        let region_seq2: RegionSeq = 42;
428        let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
429        let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
430
431        let entries = vec![
432            ManifestSstEntry {
433                table_dir: "tdir1".to_string(),
434                region_id: region_id1,
435                table_id: table_id1,
436                region_number: region_number1,
437                region_group: region_group1,
438                region_sequence: region_seq1,
439                file_id: "f1".to_string(),
440                index_file_id: None,
441                level: 1,
442                file_path: "/p1".to_string(),
443                file_size: 100,
444                index_file_path: None,
445                index_file_size: None,
446                num_rows: 10,
447                num_row_groups: 2,
448                num_series: Some(5),
449                min_ts: Timestamp::new_millisecond(1000), // 1s -> 1_000_000_000ns
450                max_ts: Timestamp::new_second(2),         // 2s -> 2_000_000_000ns
451                sequence: None,
452                origin_region_id: region_id1,
453                node_id: Some(1),
454                visible: false,
455            },
456            ManifestSstEntry {
457                table_dir: "tdir2".to_string(),
458                region_id: region_id2,
459                table_id: table_id2,
460                region_number: region_number2,
461                region_group: region_group2,
462                region_sequence: region_seq2,
463                file_id: "f2".to_string(),
464                index_file_id: Some("idx".to_string()),
465                level: 3,
466                file_path: "/p2".to_string(),
467                file_size: 200,
468                index_file_path: Some("idx".to_string()),
469                index_file_size: Some(11),
470                num_rows: 20,
471                num_row_groups: 4,
472                num_series: None,
473                min_ts: Timestamp::new_nanosecond(5),     // 5ns
474                max_ts: Timestamp::new_microsecond(2000), // 2ms -> 2_000_000ns
475                sequence: Some(9),
476                origin_region_id: region_id2,
477                node_id: None,
478                visible: true,
479            },
480        ];
481
482        let schema = ManifestSstEntry::schema();
483        let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
484
485        // Schema checks
486        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
487        assert_eq!(2, batch.num_rows());
488        for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
489            assert_eq!(f.name(), batch.schema().field(i).name());
490            assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
491            assert_eq!(f.data_type(), batch.schema().field(i).data_type());
492        }
493
494        // Column asserts
495        let table_dirs = batch
496            .column(0)
497            .as_any()
498            .downcast_ref::<StringArray>()
499            .unwrap();
500        assert_eq!("tdir1", table_dirs.value(0));
501        assert_eq!("tdir2", table_dirs.value(1));
502
503        let region_ids = batch
504            .column(1)
505            .as_any()
506            .downcast_ref::<UInt64Array>()
507            .unwrap();
508        assert_eq!(region_id1.as_u64(), region_ids.value(0));
509        assert_eq!(region_id2.as_u64(), region_ids.value(1));
510
511        let table_ids = batch
512            .column(2)
513            .as_any()
514            .downcast_ref::<UInt32Array>()
515            .unwrap();
516        assert_eq!(table_id1, table_ids.value(0));
517        assert_eq!(table_id2, table_ids.value(1));
518
519        let region_numbers = batch
520            .column(3)
521            .as_any()
522            .downcast_ref::<UInt32Array>()
523            .unwrap();
524        assert_eq!(region_number1, region_numbers.value(0));
525        assert_eq!(region_number2, region_numbers.value(1));
526
527        let region_groups = batch
528            .column(4)
529            .as_any()
530            .downcast_ref::<UInt8Array>()
531            .unwrap();
532        assert_eq!(region_group1, region_groups.value(0));
533        assert_eq!(region_group2, region_groups.value(1));
534
535        let region_sequences = batch
536            .column(5)
537            .as_any()
538            .downcast_ref::<UInt32Array>()
539            .unwrap();
540        assert_eq!(region_seq1, region_sequences.value(0));
541        assert_eq!(region_seq2, region_sequences.value(1));
542
543        let file_ids = batch
544            .column(6)
545            .as_any()
546            .downcast_ref::<StringArray>()
547            .unwrap();
548        assert_eq!("f1", file_ids.value(0));
549        assert_eq!("f2", file_ids.value(1));
550
551        let index_file_ids = batch
552            .column(7)
553            .as_any()
554            .downcast_ref::<StringArray>()
555            .unwrap();
556        assert!(index_file_ids.is_null(0));
557        assert_eq!("idx", index_file_ids.value(1));
558
559        let levels = batch
560            .column(8)
561            .as_any()
562            .downcast_ref::<UInt8Array>()
563            .unwrap();
564        assert_eq!(1, levels.value(0));
565        assert_eq!(3, levels.value(1));
566
567        let file_paths = batch
568            .column(9)
569            .as_any()
570            .downcast_ref::<StringArray>()
571            .unwrap();
572        assert_eq!("/p1", file_paths.value(0));
573        assert_eq!("/p2", file_paths.value(1));
574
575        let file_sizes = batch
576            .column(10)
577            .as_any()
578            .downcast_ref::<UInt64Array>()
579            .unwrap();
580        assert_eq!(100, file_sizes.value(0));
581        assert_eq!(200, file_sizes.value(1));
582
583        let index_file_paths = batch
584            .column(11)
585            .as_any()
586            .downcast_ref::<StringArray>()
587            .unwrap();
588        assert!(index_file_paths.is_null(0));
589        assert_eq!("idx", index_file_paths.value(1));
590
591        let index_file_sizes = batch
592            .column(12)
593            .as_any()
594            .downcast_ref::<UInt64Array>()
595            .unwrap();
596        assert!(index_file_sizes.is_null(0));
597        assert_eq!(11, index_file_sizes.value(1));
598
599        let num_rows = batch
600            .column(13)
601            .as_any()
602            .downcast_ref::<UInt64Array>()
603            .unwrap();
604        assert_eq!(10, num_rows.value(0));
605        assert_eq!(20, num_rows.value(1));
606
607        let num_row_groups = batch
608            .column(14)
609            .as_any()
610            .downcast_ref::<UInt64Array>()
611            .unwrap();
612        assert_eq!(2, num_row_groups.value(0));
613        assert_eq!(4, num_row_groups.value(1));
614
615        let num_series = batch
616            .column(15)
617            .as_any()
618            .downcast_ref::<UInt64Array>()
619            .unwrap();
620        assert_eq!(5, num_series.value(0));
621        assert!(num_series.is_null(1));
622
623        let min_ts = batch
624            .column(16)
625            .as_any()
626            .downcast_ref::<TimestampNanosecondArray>()
627            .unwrap();
628        assert_eq!(1_000_000_000, min_ts.value(0));
629        assert_eq!(5, min_ts.value(1));
630
631        let max_ts = batch
632            .column(17)
633            .as_any()
634            .downcast_ref::<TimestampNanosecondArray>()
635            .unwrap();
636        assert_eq!(2_000_000_000, max_ts.value(0));
637        assert_eq!(2_000_000, max_ts.value(1));
638
639        let sequences = batch
640            .column(18)
641            .as_any()
642            .downcast_ref::<UInt64Array>()
643            .unwrap();
644        assert!(sequences.is_null(0));
645        assert_eq!(9, sequences.value(1));
646
647        let origin_region_ids = batch
648            .column(19)
649            .as_any()
650            .downcast_ref::<UInt64Array>()
651            .unwrap();
652        assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
653        assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
654
655        let node_ids = batch
656            .column(20)
657            .as_any()
658            .downcast_ref::<UInt64Array>()
659            .unwrap();
660        assert_eq!(1, node_ids.value(0));
661        assert!(node_ids.is_null(1));
662
663        let visible = batch
664            .column(21)
665            .as_any()
666            .downcast_ref::<BooleanArray>()
667            .unwrap();
668        assert!(!visible.value(0));
669        assert!(visible.value(1));
670    }
671
672    #[test]
673    fn test_sst_entry_storage_to_record_batch() {
674        let entries = vec![
675            StorageSstEntry {
676                file_path: "/s1".to_string(),
677                file_size: None,
678                last_modified_ms: None,
679                node_id: Some(1),
680            },
681            StorageSstEntry {
682                file_path: "/s2".to_string(),
683                file_size: Some(123),
684                last_modified_ms: Some(Timestamp::new_millisecond(456)),
685                node_id: None,
686            },
687        ];
688
689        let schema = StorageSstEntry::schema();
690        let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
691
692        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
693        assert_eq!(2, batch.num_rows());
694
695        let file_paths = batch
696            .column(0)
697            .as_any()
698            .downcast_ref::<StringArray>()
699            .unwrap();
700        assert_eq!("/s1", file_paths.value(0));
701        assert_eq!("/s2", file_paths.value(1));
702
703        let file_sizes = batch
704            .column(1)
705            .as_any()
706            .downcast_ref::<UInt64Array>()
707            .unwrap();
708        assert!(file_sizes.is_null(0));
709        assert_eq!(123, file_sizes.value(1));
710
711        let last_modified = batch
712            .column(2)
713            .as_any()
714            .downcast_ref::<TimestampMillisecondArray>()
715            .unwrap();
716        assert!(last_modified.is_null(0));
717        assert_eq!(456, last_modified.value(1));
718
719        let node_ids = batch
720            .column(3)
721            .as_any()
722            .downcast_ref::<UInt64Array>()
723            .unwrap();
724        assert_eq!(1, node_ids.value(0));
725        assert!(node_ids.is_null(1));
726    }
727
728    #[test]
729    fn test_puffin_index_meta_to_record_batch() {
730        let entries = vec![
731            PuffinIndexMetaEntry {
732                table_dir: "table1".to_string(),
733                index_file_path: "index1".to_string(),
734                region_id: RegionId::with_group_and_seq(10, 0, 20),
735                table_id: 10,
736                region_number: 20,
737                region_group: 0,
738                region_sequence: 20,
739                file_id: "file1".to_string(),
740                index_file_size: Some(1024),
741                index_type: "bloom_filter".to_string(),
742                target_type: "column".to_string(),
743                target_key: "1".to_string(),
744                target_json: "{\"column\":1}".to_string(),
745                blob_size: 256,
746                meta_json: Some("{\"bloom\":{}}".to_string()),
747                node_id: Some(42),
748            },
749            PuffinIndexMetaEntry {
750                table_dir: "table2".to_string(),
751                index_file_path: "index2".to_string(),
752                region_id: RegionId::with_group_and_seq(11, 0, 21),
753                table_id: 11,
754                region_number: 21,
755                region_group: 0,
756                region_sequence: 21,
757                file_id: "file2".to_string(),
758                index_file_size: None,
759                index_type: "inverted".to_string(),
760                target_type: "unknown".to_string(),
761                target_key: "legacy".to_string(),
762                target_json: "{}".to_string(),
763                blob_size: 0,
764                meta_json: None,
765                node_id: None,
766            },
767        ];
768
769        let schema = PuffinIndexMetaEntry::schema();
770        let batch = PuffinIndexMetaEntry::to_record_batch(&entries).unwrap();
771
772        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
773        assert_eq!(2, batch.num_rows());
774
775        let table_dirs = batch
776            .column(0)
777            .as_any()
778            .downcast_ref::<StringArray>()
779            .unwrap();
780        assert_eq!("table1", table_dirs.value(0));
781        assert_eq!("table2", table_dirs.value(1));
782
783        let index_file_paths = batch
784            .column(1)
785            .as_any()
786            .downcast_ref::<StringArray>()
787            .unwrap();
788        assert_eq!("index1", index_file_paths.value(0));
789        assert_eq!("index2", index_file_paths.value(1));
790
791        let region_ids = batch
792            .column(2)
793            .as_any()
794            .downcast_ref::<UInt64Array>()
795            .unwrap();
796        assert_eq!(
797            RegionId::with_group_and_seq(10, 0, 20).as_u64(),
798            region_ids.value(0)
799        );
800        assert_eq!(
801            RegionId::with_group_and_seq(11, 0, 21).as_u64(),
802            region_ids.value(1)
803        );
804
805        let table_ids = batch
806            .column(3)
807            .as_any()
808            .downcast_ref::<UInt32Array>()
809            .unwrap();
810        assert_eq!(10, table_ids.value(0));
811        assert_eq!(11, table_ids.value(1));
812
813        let region_numbers = batch
814            .column(4)
815            .as_any()
816            .downcast_ref::<UInt32Array>()
817            .unwrap();
818        assert_eq!(20, region_numbers.value(0));
819        assert_eq!(21, region_numbers.value(1));
820
821        let region_groups = batch
822            .column(5)
823            .as_any()
824            .downcast_ref::<UInt8Array>()
825            .unwrap();
826        assert_eq!(0, region_groups.value(0));
827        assert_eq!(0, region_groups.value(1));
828
829        let region_sequences = batch
830            .column(6)
831            .as_any()
832            .downcast_ref::<UInt32Array>()
833            .unwrap();
834        assert_eq!(20, region_sequences.value(0));
835        assert_eq!(21, region_sequences.value(1));
836
837        let file_ids = batch
838            .column(7)
839            .as_any()
840            .downcast_ref::<StringArray>()
841            .unwrap();
842        assert_eq!("file1", file_ids.value(0));
843        assert_eq!("file2", file_ids.value(1));
844
845        let index_file_sizes = batch
846            .column(8)
847            .as_any()
848            .downcast_ref::<UInt64Array>()
849            .unwrap();
850        assert_eq!(1024, index_file_sizes.value(0));
851        assert!(index_file_sizes.is_null(1));
852
853        let index_types = batch
854            .column(9)
855            .as_any()
856            .downcast_ref::<StringArray>()
857            .unwrap();
858        assert_eq!("bloom_filter", index_types.value(0));
859        assert_eq!("inverted", index_types.value(1));
860
861        let target_types = batch
862            .column(10)
863            .as_any()
864            .downcast_ref::<StringArray>()
865            .unwrap();
866        assert_eq!("column", target_types.value(0));
867        assert_eq!("unknown", target_types.value(1));
868
869        let target_keys = batch
870            .column(11)
871            .as_any()
872            .downcast_ref::<StringArray>()
873            .unwrap();
874        assert_eq!("1", target_keys.value(0));
875        assert_eq!("legacy", target_keys.value(1));
876
877        let target_json = batch
878            .column(12)
879            .as_any()
880            .downcast_ref::<StringArray>()
881            .unwrap();
882        assert_eq!("{\"column\":1}", target_json.value(0));
883        assert_eq!("{}", target_json.value(1));
884
885        let blob_sizes = batch
886            .column(13)
887            .as_any()
888            .downcast_ref::<UInt64Array>()
889            .unwrap();
890        assert_eq!(256, blob_sizes.value(0));
891        assert_eq!(0, blob_sizes.value(1));
892
893        let meta_jsons = batch
894            .column(14)
895            .as_any()
896            .downcast_ref::<StringArray>()
897            .unwrap();
898        assert_eq!("{\"bloom\":{}}", meta_jsons.value(0));
899        assert!(meta_jsons.is_null(1));
900
901        let node_ids = batch
902            .column(15)
903            .as_any()
904            .downcast_ref::<UInt64Array>()
905            .unwrap();
906        assert_eq!(42, node_ids.value(0));
907        assert!(node_ids.is_null(1));
908    }
909
910    #[test]
911    fn test_manifest_build_plan() {
912        // Note: filter must reference a column in the projected schema
913        let request = ScanRequest {
914            projection: Some(vec![0, 1, 2]),
915            filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
916            limit: Some(5),
917            ..Default::default()
918        };
919
920        let plan = ManifestSstEntry::build_plan(request).unwrap();
921
922        // Expect plan to be Filter -> Limit -> TableScan or Filter+Limit wrapped.
923        // We'll pattern match to reach TableScan and verify key fields.
924        let (scan, has_filter, has_limit) = extract_scan(&plan);
925
926        assert!(has_filter);
927        assert!(has_limit);
928        assert_eq!(
929            scan.table_name,
930            TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
931        );
932        assert_eq!(scan.projection, Some(vec![0, 1, 2]));
933
934        // projected schema should match projection
935        let fields = scan.projected_schema.fields();
936        assert_eq!(fields.len(), 3);
937        assert_eq!(fields[0].name(), "table_dir");
938        assert_eq!(fields[1].name(), "region_id");
939        assert_eq!(fields[2].name(), "table_id");
940    }
941
942    #[test]
943    fn test_storage_build_plan() {
944        let request = ScanRequest {
945            projection: Some(vec![0, 2]),
946            filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
947            limit: Some(1),
948            ..Default::default()
949        };
950
951        let plan = StorageSstEntry::build_plan(request).unwrap();
952        let (scan, has_filter, has_limit) = extract_scan(&plan);
953        assert!(has_filter);
954        assert!(has_limit);
955        assert_eq!(
956            scan.table_name,
957            TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
958        );
959        assert_eq!(scan.projection, Some(vec![0, 2]));
960
961        let fields = scan.projected_schema.fields();
962        assert_eq!(fields.len(), 2);
963        assert_eq!(fields[0].name(), "file_path");
964        assert_eq!(fields[1].name(), "last_modified_ms");
965    }
966
967    // Helper to reach TableScan and detect presence of Filter/Limit in plan
968    fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
969        use datafusion_expr::logical_plan::Limit;
970
971        match plan {
972            LogicalPlan::Filter(f) => {
973                let (scan, _, has_limit) = extract_scan(&f.input);
974                (scan, true, has_limit)
975            }
976            LogicalPlan::Limit(Limit { input, .. }) => {
977                let (scan, has_filter, _) = extract_scan(input);
978                (scan, has_filter, true)
979            }
980            LogicalPlan::TableScan(scan) => (scan, false, false),
981            other => panic!("unexpected plan: {other:?}"),
982        }
983    }
984}