Skip to main content

store_api/
sst_entry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use common_recordbatch::DfRecordBatch;
19use common_time::Timestamp;
20use common_time::timestamp::TimeUnit;
21use datafusion_common::DataFusionError;
22use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
23use datatypes::arrow::array::{
24    ArrayRef, BinaryArray, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray,
25    UInt8Array, UInt32Array, UInt64Array,
26};
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow_array::StringArray;
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use serde::{Deserialize, Serialize};
31
32use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
33
34/// Puffin index type for bloom filter indexes.
35pub const PUFFIN_INDEX_TYPE_BLOOM_FILTER: &str = "bloom_filter";
36/// Puffin index type for bloom-backed full-text indexes.
37pub const PUFFIN_INDEX_TYPE_FULLTEXT_BLOOM: &str = "fulltext_bloom";
38/// Puffin index type for Tantivy-backed full-text indexes.
39pub const PUFFIN_INDEX_TYPE_FULLTEXT_TANTIVY: &str = "fulltext_tantivy";
40/// Puffin index type for inverted indexes.
41pub const PUFFIN_INDEX_TYPE_INVERTED: &str = "inverted";
42
43/// An entry describing a SST file known by the engine's manifest.
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
45pub struct ManifestSstEntry {
46    /// The table directory this file belongs to.
47    pub table_dir: String,
48    /// The region id of region that refers to the file.
49    pub region_id: RegionId,
50    /// The table id this file belongs to.
51    pub table_id: TableId,
52    /// The region number this file belongs to.
53    pub region_number: RegionNumber,
54    /// The region group this file belongs to.
55    pub region_group: RegionGroup,
56    /// The region sequence this file belongs to.
57    pub region_sequence: RegionSeq,
58    /// Engine-specific file identifier (string form).
59    pub file_id: String,
60    /// Index version, increment when the index file is rebuilt.
61    pub index_version: u64,
62    /// SST level.
63    pub level: u8,
64    /// Full path of the SST file in object store.
65    pub file_path: String,
66    /// File size in bytes.
67    pub file_size: u64,
68    /// Full path of the index file in object store.
69    pub index_file_path: Option<String>,
70    /// File size of the index file in object store.
71    pub index_file_size: Option<u64>,
72    /// Number of rows in the SST.
73    pub num_rows: u64,
74    /// Number of row groups in the SST.
75    pub num_row_groups: u64,
76    /// Number of series in the SST.
77    pub num_series: Option<u64>,
78    /// Min timestamp.
79    pub min_ts: Timestamp,
80    /// Max timestamp.
81    pub max_ts: Timestamp,
82    /// The sequence number associated with this file.
83    pub sequence: Option<u64>,
84    /// The region id of region that creates the file.
85    pub origin_region_id: RegionId,
86    /// The node id fetched from the manifest.
87    pub node_id: Option<u64>,
88    /// Whether this file is visible in current version.
89    pub visible: bool,
90    /// Minimum encoded primary key in the SST.
91    pub primary_key_min: Option<Bytes>,
92    /// Maximum encoded primary key in the SST.
93    pub primary_key_max: Option<Bytes>,
94}
95
96impl ManifestSstEntry {
97    /// Returns the schema of the manifest sst entry.
98    pub fn schema() -> SchemaRef {
99        use datatypes::prelude::ConcreteDataType as Ty;
100        Arc::new(Schema::new(vec![
101            ColumnSchema::new("table_dir", Ty::string_datatype(), false),
102            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
103            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
104            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
105            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
106            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
107            ColumnSchema::new("file_id", Ty::string_datatype(), false),
108            ColumnSchema::new("index_version", Ty::uint64_datatype(), false),
109            ColumnSchema::new("level", Ty::uint8_datatype(), false),
110            ColumnSchema::new("file_path", Ty::string_datatype(), false),
111            ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
112            ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
113            ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
114            ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
115            ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
116            ColumnSchema::new("num_series", Ty::uint64_datatype(), true),
117            ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
118            ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
119            ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
120            ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
121            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
122            ColumnSchema::new("visible", Ty::boolean_datatype(), false),
123            ColumnSchema::new("primary_key_min", Ty::binary_datatype(), true),
124            ColumnSchema::new("primary_key_max", Ty::binary_datatype(), true),
125        ]))
126    }
127
128    /// Converts a list of manifest sst entries to a record batch.
129    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
130        let schema = Self::schema();
131        let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
132        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
133        let table_ids = entries.iter().map(|e| e.table_id);
134        let region_numbers = entries.iter().map(|e| e.region_number);
135        let region_groups = entries.iter().map(|e| e.region_group);
136        let region_sequences = entries.iter().map(|e| e.region_sequence);
137        let file_ids = entries.iter().map(|e| e.file_id.as_str());
138        let index_versions = entries.iter().map(|e| e.index_version);
139        let levels = entries.iter().map(|e| e.level);
140        let file_paths = entries.iter().map(|e| e.file_path.as_str());
141        let file_sizes = entries.iter().map(|e| e.file_size);
142        let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
143        let index_file_sizes = entries.iter().map(|e| e.index_file_size);
144        let num_rows = entries.iter().map(|e| e.num_rows);
145        let num_row_groups = entries.iter().map(|e| e.num_row_groups);
146        let num_series = entries.iter().map(|e| e.num_series);
147        let min_ts = entries.iter().map(|e| {
148            e.min_ts
149                .convert_to(TimeUnit::Nanosecond)
150                .map(|ts| ts.value())
151        });
152        let max_ts = entries.iter().map(|e| {
153            e.max_ts
154                .convert_to(TimeUnit::Nanosecond)
155                .map(|ts| ts.value())
156        });
157        let sequences = entries.iter().map(|e| e.sequence);
158        let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
159        let node_ids = entries.iter().map(|e| e.node_id);
160        let visible_flags = entries.iter().map(|e| Some(e.visible));
161        let primary_key_min = entries.iter().map(|e| e.primary_key_min.as_deref());
162        let primary_key_max = entries.iter().map(|e| e.primary_key_max.as_deref());
163
164        let columns: Vec<ArrayRef> = vec![
165            Arc::new(StringArray::from_iter_values(table_dirs)),
166            Arc::new(UInt64Array::from_iter_values(region_ids)),
167            Arc::new(UInt32Array::from_iter_values(table_ids)),
168            Arc::new(UInt32Array::from_iter_values(region_numbers)),
169            Arc::new(UInt8Array::from_iter_values(region_groups)),
170            Arc::new(UInt32Array::from_iter_values(region_sequences)),
171            Arc::new(StringArray::from_iter_values(file_ids)),
172            Arc::new(UInt64Array::from_iter(index_versions)),
173            Arc::new(UInt8Array::from_iter_values(levels)),
174            Arc::new(StringArray::from_iter_values(file_paths)),
175            Arc::new(UInt64Array::from_iter_values(file_sizes)),
176            Arc::new(StringArray::from_iter(index_file_paths)),
177            Arc::new(UInt64Array::from_iter(index_file_sizes)),
178            Arc::new(UInt64Array::from_iter_values(num_rows)),
179            Arc::new(UInt64Array::from_iter_values(num_row_groups)),
180            Arc::new(UInt64Array::from_iter(num_series)),
181            Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
182            Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
183            Arc::new(UInt64Array::from_iter(sequences)),
184            Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
185            Arc::new(UInt64Array::from_iter(node_ids)),
186            Arc::new(BooleanArray::from_iter(visible_flags)),
187            Arc::new(BinaryArray::from_iter(primary_key_min)),
188            Arc::new(BinaryArray::from_iter(primary_key_max)),
189        ];
190
191        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
192    }
193
194    /// Reserved internal inspect table name.
195    ///
196    /// This table name is used only for building logical plans on the
197    /// frontend -> datanode path. It is not user-visible and cannot be
198    /// referenced by user queries.
199    pub fn reserved_table_name_for_inspection() -> &'static str {
200        "__inspect/__mito/__sst_manifest"
201    }
202
203    /// Builds a logical plan for scanning the manifest sst entries.
204    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
205        build_plan_helper(
206            scan_request,
207            Self::reserved_table_name_for_inspection(),
208            Self::schema(),
209        )
210    }
211}
212
213/// An entry describing a SST file listed from storage layer directly.
214#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
215pub struct StorageSstEntry {
216    /// Full path of the SST file in object store.
217    pub file_path: String,
218    /// File size in bytes.
219    pub file_size: Option<u64>,
220    /// Last modified time in milliseconds since epoch, if available from storage.
221    pub last_modified_ms: Option<Timestamp>,
222    /// The node id fetched from the manifest.
223    pub node_id: Option<u64>,
224}
225
226impl StorageSstEntry {
227    /// Returns the schema of the storage sst entry.
228    pub fn schema() -> SchemaRef {
229        use datatypes::prelude::ConcreteDataType as Ty;
230        Arc::new(Schema::new(vec![
231            ColumnSchema::new("file_path", Ty::string_datatype(), false),
232            ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
233            ColumnSchema::new(
234                "last_modified_ms",
235                Ty::timestamp_millisecond_datatype(),
236                true,
237            ),
238            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
239        ]))
240    }
241
242    /// Converts a list of storage sst entries to a record batch.
243    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
244        let schema = Self::schema();
245        let file_paths = entries.iter().map(|e| e.file_path.as_str());
246        let file_sizes = entries.iter().map(|e| e.file_size);
247        let last_modified_ms = entries.iter().map(|e| {
248            e.last_modified_ms
249                .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
250        });
251        let node_ids = entries.iter().map(|e| e.node_id);
252
253        let columns: Vec<ArrayRef> = vec![
254            Arc::new(StringArray::from_iter_values(file_paths)),
255            Arc::new(UInt64Array::from_iter(file_sizes)),
256            Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
257            Arc::new(UInt64Array::from_iter(node_ids)),
258        ];
259
260        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
261    }
262
263    /// Reserved internal inspect table name.
264    ///
265    /// This table name is used only for building logical plans on the
266    /// frontend -> datanode path. It is not user-visible and cannot be
267    /// referenced by user queries.
268    pub fn reserved_table_name_for_inspection() -> &'static str {
269        "__inspect/__mito/__sst_storage"
270    }
271
272    /// Builds a logical plan for scanning the storage sst entries.
273    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
274        build_plan_helper(
275            scan_request,
276            Self::reserved_table_name_for_inspection(),
277            Self::schema(),
278        )
279    }
280}
281
282/// An entry describing puffin index metadata for inspection.
283#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
284pub struct PuffinIndexMetaEntry {
285    /// The table directory this index belongs to.
286    pub table_dir: String,
287    /// The full path of the index file in object store.
288    pub index_file_path: String,
289    /// The region id referencing the index file.
290    pub region_id: RegionId,
291    /// The table id referencing the index file.
292    pub table_id: TableId,
293    /// The region number referencing the index file.
294    pub region_number: RegionNumber,
295    /// The region group referencing the index file.
296    pub region_group: RegionGroup,
297    /// The region sequence referencing the index file.
298    pub region_sequence: RegionSeq,
299    /// Engine-specific file identifier (string form).
300    pub file_id: String,
301    /// Size of the index file in object store (if available).
302    pub index_file_size: Option<u64>,
303    /// Logical index type (`bloom_filter`, `fulltext_bloom`, `fulltext_tantivy`, `inverted`).
304    pub index_type: String,
305    /// Target type (`column`, ...).
306    pub target_type: String,
307    /// Encoded target key string.
308    pub target_key: String,
309    /// Structured JSON describing the target.
310    pub target_json: String,
311    /// Size of the blob storing this target.
312    pub blob_size: u64,
313    /// Structured JSON describing index-specific metadata (if available).
314    pub meta_json: Option<String>,
315    /// Node id associated with the index file (if known).
316    pub node_id: Option<u64>,
317}
318
319impl PuffinIndexMetaEntry {
320    /// Returns the schema describing puffin index metadata entries.
321    pub fn schema() -> SchemaRef {
322        use datatypes::prelude::ConcreteDataType as Ty;
323        Arc::new(Schema::new(vec![
324            ColumnSchema::new("table_dir", Ty::string_datatype(), false),
325            ColumnSchema::new("index_file_path", Ty::string_datatype(), false),
326            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
327            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
328            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
329            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
330            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
331            ColumnSchema::new("file_id", Ty::string_datatype(), false),
332            ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
333            ColumnSchema::new("index_type", Ty::string_datatype(), false),
334            ColumnSchema::new("target_type", Ty::string_datatype(), false),
335            ColumnSchema::new("target_key", Ty::string_datatype(), false),
336            ColumnSchema::new("target_json", Ty::string_datatype(), false),
337            ColumnSchema::new("blob_size", Ty::uint64_datatype(), false),
338            ColumnSchema::new("meta_json", Ty::string_datatype(), true),
339            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
340        ]))
341    }
342
343    /// Converts a list of puffin index metadata entries to a record batch.
344    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
345        let schema = Self::schema();
346        let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
347        let index_file_paths = entries.iter().map(|e| e.index_file_path.as_str());
348        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
349        let table_ids = entries.iter().map(|e| e.table_id);
350        let region_numbers = entries.iter().map(|e| e.region_number);
351        let region_groups = entries.iter().map(|e| e.region_group);
352        let region_sequences = entries.iter().map(|e| e.region_sequence);
353        let file_ids = entries.iter().map(|e| e.file_id.as_str());
354        let index_file_sizes = entries.iter().map(|e| e.index_file_size);
355        let index_types = entries.iter().map(|e| e.index_type.as_str());
356        let target_types = entries.iter().map(|e| e.target_type.as_str());
357        let target_keys = entries.iter().map(|e| e.target_key.as_str());
358        let target_jsons = entries.iter().map(|e| e.target_json.as_str());
359        let blob_sizes = entries.iter().map(|e| e.blob_size);
360        let meta_jsons = entries.iter().map(|e| e.meta_json.as_deref());
361        let node_ids = entries.iter().map(|e| e.node_id);
362
363        let columns: Vec<ArrayRef> = vec![
364            Arc::new(StringArray::from_iter_values(table_dirs)),
365            Arc::new(StringArray::from_iter_values(index_file_paths)),
366            Arc::new(UInt64Array::from_iter_values(region_ids)),
367            Arc::new(UInt32Array::from_iter_values(table_ids)),
368            Arc::new(UInt32Array::from_iter_values(region_numbers)),
369            Arc::new(UInt8Array::from_iter_values(region_groups)),
370            Arc::new(UInt32Array::from_iter_values(region_sequences)),
371            Arc::new(StringArray::from_iter_values(file_ids)),
372            Arc::new(UInt64Array::from_iter(index_file_sizes)),
373            Arc::new(StringArray::from_iter_values(index_types)),
374            Arc::new(StringArray::from_iter_values(target_types)),
375            Arc::new(StringArray::from_iter_values(target_keys)),
376            Arc::new(StringArray::from_iter_values(target_jsons)),
377            Arc::new(UInt64Array::from_iter_values(blob_sizes)),
378            Arc::new(StringArray::from_iter(meta_jsons)),
379            Arc::new(UInt64Array::from_iter(node_ids)),
380        ];
381
382        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
383    }
384
385    /// Reserved internal inspect table name for puffin index metadata.
386    pub fn reserved_table_name_for_inspection() -> &'static str {
387        "__inspect/__mito/__puffin_index_meta"
388    }
389
390    /// Builds a logical plan for scanning puffin index metadata entries.
391    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
392        build_plan_helper(
393            scan_request,
394            Self::reserved_table_name_for_inspection(),
395            Self::schema(),
396        )
397    }
398}
399
400fn build_plan_helper(
401    scan_request: ScanRequest,
402    table_name: &str,
403    schema: SchemaRef,
404) -> Result<LogicalPlan, DataFusionError> {
405    let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
406
407    let projection = scan_request.projection_input.map(|input| input.projection);
408    let mut builder = LogicalPlanBuilder::scan(table_name, Arc::new(table_source), projection)?;
409
410    for filter in scan_request.filters {
411        builder = builder.filter(filter)?;
412    }
413
414    if let Some(limit) = scan_request.limit {
415        builder = builder.limit(0, Some(limit))?;
416    }
417
418    builder.build()
419}
420
421#[cfg(test)]
422mod tests {
423    use datafusion_common::TableReference;
424    use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
425    use datatypes::arrow::array::{
426        Array, BinaryArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
427        UInt32Array, UInt64Array,
428    };
429    use datatypes::arrow_array::StringArray;
430
431    use super::*;
432
433    #[test]
434    fn test_sst_entry_manifest_to_record_batch() {
435        // Prepare entries
436        let table_id1: TableId = 1;
437        let region_group1: RegionGroup = 2;
438        let region_seq1: RegionSeq = 3;
439        let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
440        let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
441
442        let table_id2: TableId = 5;
443        let region_group2: RegionGroup = 1;
444        let region_seq2: RegionSeq = 42;
445        let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
446        let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
447
448        let entries = vec![
449            ManifestSstEntry {
450                table_dir: "tdir1".to_string(),
451                region_id: region_id1,
452                table_id: table_id1,
453                region_number: region_number1,
454                region_group: region_group1,
455                region_sequence: region_seq1,
456                file_id: "f1".to_string(),
457                index_version: 0,
458                level: 1,
459                file_path: "/p1".to_string(),
460                file_size: 100,
461                index_file_path: None,
462                index_file_size: None,
463                num_rows: 10,
464                num_row_groups: 2,
465                num_series: Some(5),
466                min_ts: Timestamp::new_millisecond(1000), // 1s -> 1_000_000_000ns
467                max_ts: Timestamp::new_second(2),         // 2s -> 2_000_000_000ns
468                sequence: None,
469                origin_region_id: region_id1,
470                node_id: Some(1),
471                visible: false,
472                primary_key_min: Some(Bytes::from_static(b"aaa")),
473                primary_key_max: Some(Bytes::from_static(b"zzz")),
474            },
475            ManifestSstEntry {
476                table_dir: "tdir2".to_string(),
477                region_id: region_id2,
478                table_id: table_id2,
479                region_number: region_number2,
480                region_group: region_group2,
481                region_sequence: region_seq2,
482                file_id: "f2".to_string(),
483                index_version: 1,
484                level: 3,
485                file_path: "/p2".to_string(),
486                file_size: 200,
487                index_file_path: Some("idx".to_string()),
488                index_file_size: Some(11),
489                num_rows: 20,
490                num_row_groups: 4,
491                num_series: None,
492                min_ts: Timestamp::new_nanosecond(5),     // 5ns
493                max_ts: Timestamp::new_microsecond(2000), // 2ms -> 2_000_000ns
494                sequence: Some(9),
495                origin_region_id: region_id2,
496                node_id: None,
497                visible: true,
498                primary_key_min: None,
499                primary_key_max: None,
500            },
501        ];
502
503        let schema = ManifestSstEntry::schema();
504        let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
505
506        // Schema checks
507        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
508        assert_eq!(2, batch.num_rows());
509        for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
510            assert_eq!(f.name(), batch.schema().field(i).name());
511            assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
512            assert_eq!(f.data_type(), batch.schema().field(i).data_type());
513        }
514
515        // Column asserts
516        let table_dirs = batch
517            .column(0)
518            .as_any()
519            .downcast_ref::<StringArray>()
520            .unwrap();
521        assert_eq!("tdir1", table_dirs.value(0));
522        assert_eq!("tdir2", table_dirs.value(1));
523
524        let region_ids = batch
525            .column(1)
526            .as_any()
527            .downcast_ref::<UInt64Array>()
528            .unwrap();
529        assert_eq!(region_id1.as_u64(), region_ids.value(0));
530        assert_eq!(region_id2.as_u64(), region_ids.value(1));
531
532        let table_ids = batch
533            .column(2)
534            .as_any()
535            .downcast_ref::<UInt32Array>()
536            .unwrap();
537        assert_eq!(table_id1, table_ids.value(0));
538        assert_eq!(table_id2, table_ids.value(1));
539
540        let region_numbers = batch
541            .column(3)
542            .as_any()
543            .downcast_ref::<UInt32Array>()
544            .unwrap();
545        assert_eq!(region_number1, region_numbers.value(0));
546        assert_eq!(region_number2, region_numbers.value(1));
547
548        let region_groups = batch
549            .column(4)
550            .as_any()
551            .downcast_ref::<UInt8Array>()
552            .unwrap();
553        assert_eq!(region_group1, region_groups.value(0));
554        assert_eq!(region_group2, region_groups.value(1));
555
556        let region_sequences = batch
557            .column(5)
558            .as_any()
559            .downcast_ref::<UInt32Array>()
560            .unwrap();
561        assert_eq!(region_seq1, region_sequences.value(0));
562        assert_eq!(region_seq2, region_sequences.value(1));
563
564        let file_ids = batch
565            .column(6)
566            .as_any()
567            .downcast_ref::<StringArray>()
568            .unwrap();
569        assert_eq!("f1", file_ids.value(0));
570        assert_eq!("f2", file_ids.value(1));
571
572        let index_versions = batch
573            .column(7)
574            .as_any()
575            .downcast_ref::<UInt64Array>()
576            .unwrap();
577        assert_eq!(0, index_versions.value(0));
578        assert_eq!(1, index_versions.value(1));
579
580        let levels = batch
581            .column(8)
582            .as_any()
583            .downcast_ref::<UInt8Array>()
584            .unwrap();
585        assert_eq!(1, levels.value(0));
586        assert_eq!(3, levels.value(1));
587
588        let file_paths = batch
589            .column(9)
590            .as_any()
591            .downcast_ref::<StringArray>()
592            .unwrap();
593        assert_eq!("/p1", file_paths.value(0));
594        assert_eq!("/p2", file_paths.value(1));
595
596        let file_sizes = batch
597            .column(10)
598            .as_any()
599            .downcast_ref::<UInt64Array>()
600            .unwrap();
601        assert_eq!(100, file_sizes.value(0));
602        assert_eq!(200, file_sizes.value(1));
603
604        let index_file_paths = batch
605            .column(11)
606            .as_any()
607            .downcast_ref::<StringArray>()
608            .unwrap();
609        assert!(index_file_paths.is_null(0));
610        assert_eq!("idx", index_file_paths.value(1));
611
612        let index_file_sizes = batch
613            .column(12)
614            .as_any()
615            .downcast_ref::<UInt64Array>()
616            .unwrap();
617        assert!(index_file_sizes.is_null(0));
618        assert_eq!(11, index_file_sizes.value(1));
619
620        let num_rows = batch
621            .column(13)
622            .as_any()
623            .downcast_ref::<UInt64Array>()
624            .unwrap();
625        assert_eq!(10, num_rows.value(0));
626        assert_eq!(20, num_rows.value(1));
627
628        let num_row_groups = batch
629            .column(14)
630            .as_any()
631            .downcast_ref::<UInt64Array>()
632            .unwrap();
633        assert_eq!(2, num_row_groups.value(0));
634        assert_eq!(4, num_row_groups.value(1));
635
636        let num_series = batch
637            .column(15)
638            .as_any()
639            .downcast_ref::<UInt64Array>()
640            .unwrap();
641        assert_eq!(5, num_series.value(0));
642        assert!(num_series.is_null(1));
643
644        let min_ts = batch
645            .column(16)
646            .as_any()
647            .downcast_ref::<TimestampNanosecondArray>()
648            .unwrap();
649        assert_eq!(1_000_000_000, min_ts.value(0));
650        assert_eq!(5, min_ts.value(1));
651
652        let max_ts = batch
653            .column(17)
654            .as_any()
655            .downcast_ref::<TimestampNanosecondArray>()
656            .unwrap();
657        assert_eq!(2_000_000_000, max_ts.value(0));
658        assert_eq!(2_000_000, max_ts.value(1));
659
660        let sequences = batch
661            .column(18)
662            .as_any()
663            .downcast_ref::<UInt64Array>()
664            .unwrap();
665        assert!(sequences.is_null(0));
666        assert_eq!(9, sequences.value(1));
667
668        let origin_region_ids = batch
669            .column(19)
670            .as_any()
671            .downcast_ref::<UInt64Array>()
672            .unwrap();
673        assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
674        assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
675
676        let node_ids = batch
677            .column(20)
678            .as_any()
679            .downcast_ref::<UInt64Array>()
680            .unwrap();
681        assert_eq!(1, node_ids.value(0));
682        assert!(node_ids.is_null(1));
683
684        let visible = batch
685            .column(21)
686            .as_any()
687            .downcast_ref::<BooleanArray>()
688            .unwrap();
689        assert!(!visible.value(0));
690        assert!(visible.value(1));
691
692        let primary_key_min = batch
693            .column(22)
694            .as_any()
695            .downcast_ref::<BinaryArray>()
696            .unwrap();
697        assert_eq!(b"aaa", primary_key_min.value(0));
698        assert!(primary_key_min.is_null(1));
699
700        let primary_key_max = batch
701            .column(23)
702            .as_any()
703            .downcast_ref::<BinaryArray>()
704            .unwrap();
705        assert_eq!(b"zzz", primary_key_max.value(0));
706        assert!(primary_key_max.is_null(1));
707    }
708
709    #[test]
710    fn test_sst_entry_storage_to_record_batch() {
711        let entries = vec![
712            StorageSstEntry {
713                file_path: "/s1".to_string(),
714                file_size: None,
715                last_modified_ms: None,
716                node_id: Some(1),
717            },
718            StorageSstEntry {
719                file_path: "/s2".to_string(),
720                file_size: Some(123),
721                last_modified_ms: Some(Timestamp::new_millisecond(456)),
722                node_id: None,
723            },
724        ];
725
726        let schema = StorageSstEntry::schema();
727        let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
728
729        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
730        assert_eq!(2, batch.num_rows());
731
732        let file_paths = batch
733            .column(0)
734            .as_any()
735            .downcast_ref::<StringArray>()
736            .unwrap();
737        assert_eq!("/s1", file_paths.value(0));
738        assert_eq!("/s2", file_paths.value(1));
739
740        let file_sizes = batch
741            .column(1)
742            .as_any()
743            .downcast_ref::<UInt64Array>()
744            .unwrap();
745        assert!(file_sizes.is_null(0));
746        assert_eq!(123, file_sizes.value(1));
747
748        let last_modified = batch
749            .column(2)
750            .as_any()
751            .downcast_ref::<TimestampMillisecondArray>()
752            .unwrap();
753        assert!(last_modified.is_null(0));
754        assert_eq!(456, last_modified.value(1));
755
756        let node_ids = batch
757            .column(3)
758            .as_any()
759            .downcast_ref::<UInt64Array>()
760            .unwrap();
761        assert_eq!(1, node_ids.value(0));
762        assert!(node_ids.is_null(1));
763    }
764
765    #[test]
766    fn test_puffin_index_meta_to_record_batch() {
767        let entries = vec![
768            PuffinIndexMetaEntry {
769                table_dir: "table1".to_string(),
770                index_file_path: "index1".to_string(),
771                region_id: RegionId::with_group_and_seq(10, 0, 20),
772                table_id: 10,
773                region_number: 20,
774                region_group: 0,
775                region_sequence: 20,
776                file_id: "file1".to_string(),
777                index_file_size: Some(1024),
778                index_type: "bloom_filter".to_string(),
779                target_type: "column".to_string(),
780                target_key: "1".to_string(),
781                target_json: "{\"column\":1}".to_string(),
782                blob_size: 256,
783                meta_json: Some("{\"bloom\":{}}".to_string()),
784                node_id: Some(42),
785            },
786            PuffinIndexMetaEntry {
787                table_dir: "table2".to_string(),
788                index_file_path: "index2".to_string(),
789                region_id: RegionId::with_group_and_seq(11, 0, 21),
790                table_id: 11,
791                region_number: 21,
792                region_group: 0,
793                region_sequence: 21,
794                file_id: "file2".to_string(),
795                index_file_size: None,
796                index_type: "inverted".to_string(),
797                target_type: "unknown".to_string(),
798                target_key: "legacy".to_string(),
799                target_json: "{}".to_string(),
800                blob_size: 0,
801                meta_json: None,
802                node_id: None,
803            },
804        ];
805
806        let schema = PuffinIndexMetaEntry::schema();
807        let batch = PuffinIndexMetaEntry::to_record_batch(&entries).unwrap();
808
809        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
810        assert_eq!(2, batch.num_rows());
811
812        let table_dirs = batch
813            .column(0)
814            .as_any()
815            .downcast_ref::<StringArray>()
816            .unwrap();
817        assert_eq!("table1", table_dirs.value(0));
818        assert_eq!("table2", table_dirs.value(1));
819
820        let index_file_paths = batch
821            .column(1)
822            .as_any()
823            .downcast_ref::<StringArray>()
824            .unwrap();
825        assert_eq!("index1", index_file_paths.value(0));
826        assert_eq!("index2", index_file_paths.value(1));
827
828        let region_ids = batch
829            .column(2)
830            .as_any()
831            .downcast_ref::<UInt64Array>()
832            .unwrap();
833        assert_eq!(
834            RegionId::with_group_and_seq(10, 0, 20).as_u64(),
835            region_ids.value(0)
836        );
837        assert_eq!(
838            RegionId::with_group_and_seq(11, 0, 21).as_u64(),
839            region_ids.value(1)
840        );
841
842        let table_ids = batch
843            .column(3)
844            .as_any()
845            .downcast_ref::<UInt32Array>()
846            .unwrap();
847        assert_eq!(10, table_ids.value(0));
848        assert_eq!(11, table_ids.value(1));
849
850        let region_numbers = batch
851            .column(4)
852            .as_any()
853            .downcast_ref::<UInt32Array>()
854            .unwrap();
855        assert_eq!(20, region_numbers.value(0));
856        assert_eq!(21, region_numbers.value(1));
857
858        let region_groups = batch
859            .column(5)
860            .as_any()
861            .downcast_ref::<UInt8Array>()
862            .unwrap();
863        assert_eq!(0, region_groups.value(0));
864        assert_eq!(0, region_groups.value(1));
865
866        let region_sequences = batch
867            .column(6)
868            .as_any()
869            .downcast_ref::<UInt32Array>()
870            .unwrap();
871        assert_eq!(20, region_sequences.value(0));
872        assert_eq!(21, region_sequences.value(1));
873
874        let file_ids = batch
875            .column(7)
876            .as_any()
877            .downcast_ref::<StringArray>()
878            .unwrap();
879        assert_eq!("file1", file_ids.value(0));
880        assert_eq!("file2", file_ids.value(1));
881
882        let index_file_sizes = batch
883            .column(8)
884            .as_any()
885            .downcast_ref::<UInt64Array>()
886            .unwrap();
887        assert_eq!(1024, index_file_sizes.value(0));
888        assert!(index_file_sizes.is_null(1));
889
890        let index_types = batch
891            .column(9)
892            .as_any()
893            .downcast_ref::<StringArray>()
894            .unwrap();
895        assert_eq!("bloom_filter", index_types.value(0));
896        assert_eq!("inverted", index_types.value(1));
897
898        let target_types = batch
899            .column(10)
900            .as_any()
901            .downcast_ref::<StringArray>()
902            .unwrap();
903        assert_eq!("column", target_types.value(0));
904        assert_eq!("unknown", target_types.value(1));
905
906        let target_keys = batch
907            .column(11)
908            .as_any()
909            .downcast_ref::<StringArray>()
910            .unwrap();
911        assert_eq!("1", target_keys.value(0));
912        assert_eq!("legacy", target_keys.value(1));
913
914        let target_json = batch
915            .column(12)
916            .as_any()
917            .downcast_ref::<StringArray>()
918            .unwrap();
919        assert_eq!("{\"column\":1}", target_json.value(0));
920        assert_eq!("{}", target_json.value(1));
921
922        let blob_sizes = batch
923            .column(13)
924            .as_any()
925            .downcast_ref::<UInt64Array>()
926            .unwrap();
927        assert_eq!(256, blob_sizes.value(0));
928        assert_eq!(0, blob_sizes.value(1));
929
930        let meta_jsons = batch
931            .column(14)
932            .as_any()
933            .downcast_ref::<StringArray>()
934            .unwrap();
935        assert_eq!("{\"bloom\":{}}", meta_jsons.value(0));
936        assert!(meta_jsons.is_null(1));
937
938        let node_ids = batch
939            .column(15)
940            .as_any()
941            .downcast_ref::<UInt64Array>()
942            .unwrap();
943        assert_eq!(42, node_ids.value(0));
944        assert!(node_ids.is_null(1));
945    }
946
947    #[test]
948    fn test_manifest_build_plan() {
949        // Note: filter must reference a column in the projected schema
950        let projection_input = Some(vec![0, 1, 2].into());
951        let request = ScanRequest {
952            projection_input,
953            filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
954            limit: Some(5),
955            ..Default::default()
956        };
957
958        let plan = ManifestSstEntry::build_plan(request).unwrap();
959
960        // Expect plan to be Filter -> Limit -> TableScan or Filter+Limit wrapped.
961        // We'll pattern match to reach TableScan and verify key fields.
962        let (scan, has_filter, has_limit) = extract_scan(&plan);
963
964        assert!(has_filter);
965        assert!(has_limit);
966        assert_eq!(
967            scan.table_name,
968            TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
969        );
970        assert_eq!(scan.projection, Some(vec![0, 1, 2]));
971
972        // projected schema should match projection
973        let fields = scan.projected_schema.fields();
974        assert_eq!(fields.len(), 3);
975        assert_eq!(fields[0].name(), "table_dir");
976        assert_eq!(fields[1].name(), "region_id");
977        assert_eq!(fields[2].name(), "table_id");
978    }
979
980    #[test]
981    fn test_storage_build_plan() {
982        let projection_input = Some(vec![0, 2].into());
983        let request = ScanRequest {
984            projection_input,
985            filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
986            limit: Some(1),
987            ..Default::default()
988        };
989
990        let plan = StorageSstEntry::build_plan(request).unwrap();
991        let (scan, has_filter, has_limit) = extract_scan(&plan);
992        assert!(has_filter);
993        assert!(has_limit);
994        assert_eq!(
995            scan.table_name,
996            TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
997        );
998        assert_eq!(scan.projection, Some(vec![0, 2]));
999
1000        let fields = scan.projected_schema.fields();
1001        assert_eq!(fields.len(), 2);
1002        assert_eq!(fields[0].name(), "file_path");
1003        assert_eq!(fields[1].name(), "last_modified_ms");
1004    }
1005
1006    // Helper to reach TableScan and detect presence of Filter/Limit in plan
1007    fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
1008        use datafusion_expr::logical_plan::Limit;
1009
1010        match plan {
1011            LogicalPlan::Filter(f) => {
1012                let (scan, _, has_limit) = extract_scan(&f.input);
1013                (scan, true, has_limit)
1014            }
1015            LogicalPlan::Limit(Limit { input, .. }) => {
1016                let (scan, has_filter, _) = extract_scan(input);
1017                (scan, has_filter, true)
1018            }
1019            LogicalPlan::TableScan(scan) => (scan, false, false),
1020            other => panic!("unexpected plan: {other:?}"),
1021        }
1022    }
1023}