store_api/
sst_entry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use common_time::Timestamp;
19use common_time::timestamp::TimeUnit;
20use datafusion_common::DataFusionError;
21use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
22use datatypes::arrow::array::{
23    ArrayRef, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array,
24    UInt32Array, UInt64Array,
25};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow_array::StringArray;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use serde::{Deserialize, Serialize};
30
31use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
32
33/// An entry describing a SST file known by the engine's manifest.
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct ManifestSstEntry {
36    /// The table directory this file belongs to.
37    pub table_dir: String,
38    /// The region id of region that refers to the file.
39    pub region_id: RegionId,
40    /// The table id this file belongs to.
41    pub table_id: TableId,
42    /// The region number this file belongs to.
43    pub region_number: RegionNumber,
44    /// The region group this file belongs to.
45    pub region_group: RegionGroup,
46    /// The region sequence this file belongs to.
47    pub region_sequence: RegionSeq,
48    /// Engine-specific file identifier (string form).
49    pub file_id: String,
50    /// SST level.
51    pub level: u8,
52    /// Full path of the SST file in object store.
53    pub file_path: String,
54    /// File size in bytes.
55    pub file_size: u64,
56    /// Full path of the index file in object store.
57    pub index_file_path: Option<String>,
58    /// File size of the index file in object store.
59    pub index_file_size: Option<u64>,
60    /// Number of rows in the SST.
61    pub num_rows: u64,
62    /// Number of row groups in the SST.
63    pub num_row_groups: u64,
64    /// Min timestamp.
65    pub min_ts: Timestamp,
66    /// Max timestamp.
67    pub max_ts: Timestamp,
68    /// The sequence number associated with this file.
69    pub sequence: Option<u64>,
70    /// The region id of region that creates the file.
71    pub origin_region_id: RegionId,
72    /// The node id fetched from the manifest.
73    pub node_id: Option<u64>,
74    /// Whether this file is visible in current version.
75    pub visible: bool,
76}
77
78impl ManifestSstEntry {
79    /// Returns the schema of the manifest sst entry.
80    pub fn schema() -> SchemaRef {
81        use datatypes::prelude::ConcreteDataType as Ty;
82        Arc::new(Schema::new(vec![
83            ColumnSchema::new("table_dir", Ty::string_datatype(), false),
84            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
85            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
86            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
87            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
88            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
89            ColumnSchema::new("file_id", Ty::string_datatype(), false),
90            ColumnSchema::new("level", Ty::uint8_datatype(), false),
91            ColumnSchema::new("file_path", Ty::string_datatype(), false),
92            ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
93            ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
94            ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
95            ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
96            ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
97            ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
98            ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
99            ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
100            ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
101            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
102            ColumnSchema::new("visible", Ty::boolean_datatype(), false),
103        ]))
104    }
105
106    /// Converts a list of manifest sst entries to a record batch.
107    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
108        let schema = Self::schema();
109        let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
110        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
111        let table_ids = entries.iter().map(|e| e.table_id);
112        let region_numbers = entries.iter().map(|e| e.region_number);
113        let region_groups = entries.iter().map(|e| e.region_group);
114        let region_sequences = entries.iter().map(|e| e.region_sequence);
115        let file_ids = entries.iter().map(|e| e.file_id.as_str());
116        let levels = entries.iter().map(|e| e.level);
117        let file_paths = entries.iter().map(|e| e.file_path.as_str());
118        let file_sizes = entries.iter().map(|e| e.file_size);
119        let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
120        let index_file_sizes = entries.iter().map(|e| e.index_file_size);
121        let num_rows = entries.iter().map(|e| e.num_rows);
122        let num_row_groups = entries.iter().map(|e| e.num_row_groups);
123        let min_ts = entries.iter().map(|e| {
124            e.min_ts
125                .convert_to(TimeUnit::Nanosecond)
126                .map(|ts| ts.value())
127        });
128        let max_ts = entries.iter().map(|e| {
129            e.max_ts
130                .convert_to(TimeUnit::Nanosecond)
131                .map(|ts| ts.value())
132        });
133        let sequences = entries.iter().map(|e| e.sequence);
134        let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
135        let node_ids = entries.iter().map(|e| e.node_id);
136        let visible_flags = entries.iter().map(|e| Some(e.visible));
137
138        let columns: Vec<ArrayRef> = vec![
139            Arc::new(StringArray::from_iter_values(table_dirs)),
140            Arc::new(UInt64Array::from_iter_values(region_ids)),
141            Arc::new(UInt32Array::from_iter_values(table_ids)),
142            Arc::new(UInt32Array::from_iter_values(region_numbers)),
143            Arc::new(UInt8Array::from_iter_values(region_groups)),
144            Arc::new(UInt32Array::from_iter_values(region_sequences)),
145            Arc::new(StringArray::from_iter_values(file_ids)),
146            Arc::new(UInt8Array::from_iter_values(levels)),
147            Arc::new(StringArray::from_iter_values(file_paths)),
148            Arc::new(UInt64Array::from_iter_values(file_sizes)),
149            Arc::new(StringArray::from_iter(index_file_paths)),
150            Arc::new(UInt64Array::from_iter(index_file_sizes)),
151            Arc::new(UInt64Array::from_iter_values(num_rows)),
152            Arc::new(UInt64Array::from_iter_values(num_row_groups)),
153            Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
154            Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
155            Arc::new(UInt64Array::from_iter(sequences)),
156            Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
157            Arc::new(UInt64Array::from_iter(node_ids)),
158            Arc::new(BooleanArray::from_iter(visible_flags)),
159        ];
160
161        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
162    }
163
164    /// Reserved internal inspect table name.
165    ///
166    /// This table name is used only for building logical plans on the
167    /// frontend -> datanode path. It is not user-visible and cannot be
168    /// referenced by user queries.
169    pub fn reserved_table_name_for_inspection() -> &'static str {
170        "__inspect/__mito/__sst_manifest"
171    }
172
173    /// Builds a logical plan for scanning the manifest sst entries.
174    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
175        build_plan_helper(
176            scan_request,
177            Self::reserved_table_name_for_inspection(),
178            Self::schema(),
179        )
180    }
181}
182
183/// An entry describing a SST file listed from storage layer directly.
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
185pub struct StorageSstEntry {
186    /// Full path of the SST file in object store.
187    pub file_path: String,
188    /// File size in bytes.
189    pub file_size: Option<u64>,
190    /// Last modified time in milliseconds since epoch, if available from storage.
191    pub last_modified_ms: Option<Timestamp>,
192    /// The node id fetched from the manifest.
193    pub node_id: Option<u64>,
194}
195
196impl StorageSstEntry {
197    /// Returns the schema of the storage sst entry.
198    pub fn schema() -> SchemaRef {
199        use datatypes::prelude::ConcreteDataType as Ty;
200        Arc::new(Schema::new(vec![
201            ColumnSchema::new("file_path", Ty::string_datatype(), false),
202            ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
203            ColumnSchema::new(
204                "last_modified_ms",
205                Ty::timestamp_millisecond_datatype(),
206                true,
207            ),
208            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
209        ]))
210    }
211
212    /// Converts a list of storage sst entries to a record batch.
213    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
214        let schema = Self::schema();
215        let file_paths = entries.iter().map(|e| e.file_path.as_str());
216        let file_sizes = entries.iter().map(|e| e.file_size);
217        let last_modified_ms = entries.iter().map(|e| {
218            e.last_modified_ms
219                .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
220        });
221        let node_ids = entries.iter().map(|e| e.node_id);
222
223        let columns: Vec<ArrayRef> = vec![
224            Arc::new(StringArray::from_iter_values(file_paths)),
225            Arc::new(UInt64Array::from_iter(file_sizes)),
226            Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
227            Arc::new(UInt64Array::from_iter(node_ids)),
228        ];
229
230        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
231    }
232
233    /// Reserved internal inspect table name.
234    ///
235    /// This table name is used only for building logical plans on the
236    /// frontend -> datanode path. It is not user-visible and cannot be
237    /// referenced by user queries.
238    pub fn reserved_table_name_for_inspection() -> &'static str {
239        "__inspect/__mito/__sst_storage"
240    }
241
242    /// Builds a logical plan for scanning the storage sst entries.
243    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
244        build_plan_helper(
245            scan_request,
246            Self::reserved_table_name_for_inspection(),
247            Self::schema(),
248        )
249    }
250}
251
252fn build_plan_helper(
253    scan_request: ScanRequest,
254    table_name: &str,
255    schema: SchemaRef,
256) -> Result<LogicalPlan, DataFusionError> {
257    let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
258
259    let mut builder = LogicalPlanBuilder::scan(
260        table_name,
261        Arc::new(table_source),
262        scan_request.projection.clone(),
263    )?;
264
265    for filter in scan_request.filters {
266        builder = builder.filter(filter)?;
267    }
268
269    if let Some(limit) = scan_request.limit {
270        builder = builder.limit(0, Some(limit))?;
271    }
272
273    builder.build()
274}
275
276#[cfg(test)]
277mod tests {
278    use datafusion_common::TableReference;
279    use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
280    use datatypes::arrow::array::{
281        Array, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
282        UInt64Array,
283    };
284    use datatypes::arrow_array::StringArray;
285
286    use super::*;
287
288    #[test]
289    fn test_sst_entry_manifest_to_record_batch() {
290        // Prepare entries
291        let table_id1: TableId = 1;
292        let region_group1: RegionGroup = 2;
293        let region_seq1: RegionSeq = 3;
294        let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
295        let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
296
297        let table_id2: TableId = 5;
298        let region_group2: RegionGroup = 1;
299        let region_seq2: RegionSeq = 42;
300        let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
301        let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
302
303        let entries = vec![
304            ManifestSstEntry {
305                table_dir: "tdir1".to_string(),
306                region_id: region_id1,
307                table_id: table_id1,
308                region_number: region_number1,
309                region_group: region_group1,
310                region_sequence: region_seq1,
311                file_id: "f1".to_string(),
312                level: 1,
313                file_path: "/p1".to_string(),
314                file_size: 100,
315                index_file_path: None,
316                index_file_size: None,
317                num_rows: 10,
318                num_row_groups: 2,
319                min_ts: Timestamp::new_millisecond(1000), // 1s -> 1_000_000_000ns
320                max_ts: Timestamp::new_second(2),         // 2s -> 2_000_000_000ns
321                sequence: None,
322                origin_region_id: region_id1,
323                node_id: Some(1),
324                visible: false,
325            },
326            ManifestSstEntry {
327                table_dir: "tdir2".to_string(),
328                region_id: region_id2,
329                table_id: table_id2,
330                region_number: region_number2,
331                region_group: region_group2,
332                region_sequence: region_seq2,
333                file_id: "f2".to_string(),
334                level: 3,
335                file_path: "/p2".to_string(),
336                file_size: 200,
337                index_file_path: Some("idx".to_string()),
338                index_file_size: Some(11),
339                num_rows: 20,
340                num_row_groups: 4,
341                min_ts: Timestamp::new_nanosecond(5),     // 5ns
342                max_ts: Timestamp::new_microsecond(2000), // 2ms -> 2_000_000ns
343                sequence: Some(9),
344                origin_region_id: region_id2,
345                node_id: None,
346                visible: true,
347            },
348        ];
349
350        let schema = ManifestSstEntry::schema();
351        let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
352
353        // Schema checks
354        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
355        assert_eq!(2, batch.num_rows());
356        for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
357            assert_eq!(f.name(), batch.schema().field(i).name());
358            assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
359            assert_eq!(f.data_type(), batch.schema().field(i).data_type());
360        }
361
362        // Column asserts
363        let table_dirs = batch
364            .column(0)
365            .as_any()
366            .downcast_ref::<StringArray>()
367            .unwrap();
368        assert_eq!("tdir1", table_dirs.value(0));
369        assert_eq!("tdir2", table_dirs.value(1));
370
371        let region_ids = batch
372            .column(1)
373            .as_any()
374            .downcast_ref::<UInt64Array>()
375            .unwrap();
376        assert_eq!(region_id1.as_u64(), region_ids.value(0));
377        assert_eq!(region_id2.as_u64(), region_ids.value(1));
378
379        let table_ids = batch
380            .column(2)
381            .as_any()
382            .downcast_ref::<UInt32Array>()
383            .unwrap();
384        assert_eq!(table_id1, table_ids.value(0));
385        assert_eq!(table_id2, table_ids.value(1));
386
387        let region_numbers = batch
388            .column(3)
389            .as_any()
390            .downcast_ref::<UInt32Array>()
391            .unwrap();
392        assert_eq!(region_number1, region_numbers.value(0));
393        assert_eq!(region_number2, region_numbers.value(1));
394
395        let region_groups = batch
396            .column(4)
397            .as_any()
398            .downcast_ref::<UInt8Array>()
399            .unwrap();
400        assert_eq!(region_group1, region_groups.value(0));
401        assert_eq!(region_group2, region_groups.value(1));
402
403        let region_sequences = batch
404            .column(5)
405            .as_any()
406            .downcast_ref::<UInt32Array>()
407            .unwrap();
408        assert_eq!(region_seq1, region_sequences.value(0));
409        assert_eq!(region_seq2, region_sequences.value(1));
410
411        let file_ids = batch
412            .column(6)
413            .as_any()
414            .downcast_ref::<StringArray>()
415            .unwrap();
416        assert_eq!("f1", file_ids.value(0));
417        assert_eq!("f2", file_ids.value(1));
418
419        let levels = batch
420            .column(7)
421            .as_any()
422            .downcast_ref::<UInt8Array>()
423            .unwrap();
424        assert_eq!(1, levels.value(0));
425        assert_eq!(3, levels.value(1));
426
427        let file_paths = batch
428            .column(8)
429            .as_any()
430            .downcast_ref::<StringArray>()
431            .unwrap();
432        assert_eq!("/p1", file_paths.value(0));
433        assert_eq!("/p2", file_paths.value(1));
434
435        let file_sizes = batch
436            .column(9)
437            .as_any()
438            .downcast_ref::<UInt64Array>()
439            .unwrap();
440        assert_eq!(100, file_sizes.value(0));
441        assert_eq!(200, file_sizes.value(1));
442
443        let index_file_paths = batch
444            .column(10)
445            .as_any()
446            .downcast_ref::<StringArray>()
447            .unwrap();
448        assert!(index_file_paths.is_null(0));
449        assert_eq!("idx", index_file_paths.value(1));
450
451        let index_file_sizes = batch
452            .column(11)
453            .as_any()
454            .downcast_ref::<UInt64Array>()
455            .unwrap();
456        assert!(index_file_sizes.is_null(0));
457        assert_eq!(11, index_file_sizes.value(1));
458
459        let num_rows = batch
460            .column(12)
461            .as_any()
462            .downcast_ref::<UInt64Array>()
463            .unwrap();
464        assert_eq!(10, num_rows.value(0));
465        assert_eq!(20, num_rows.value(1));
466
467        let num_row_groups = batch
468            .column(13)
469            .as_any()
470            .downcast_ref::<UInt64Array>()
471            .unwrap();
472        assert_eq!(2, num_row_groups.value(0));
473        assert_eq!(4, num_row_groups.value(1));
474
475        let min_ts = batch
476            .column(14)
477            .as_any()
478            .downcast_ref::<TimestampNanosecondArray>()
479            .unwrap();
480        assert_eq!(1_000_000_000, min_ts.value(0));
481        assert_eq!(5, min_ts.value(1));
482
483        let max_ts = batch
484            .column(15)
485            .as_any()
486            .downcast_ref::<TimestampNanosecondArray>()
487            .unwrap();
488        assert_eq!(2_000_000_000, max_ts.value(0));
489        assert_eq!(2_000_000, max_ts.value(1));
490
491        let sequences = batch
492            .column(16)
493            .as_any()
494            .downcast_ref::<UInt64Array>()
495            .unwrap();
496        assert!(sequences.is_null(0));
497        assert_eq!(9, sequences.value(1));
498
499        let origin_region_ids = batch
500            .column(17)
501            .as_any()
502            .downcast_ref::<UInt64Array>()
503            .unwrap();
504        assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
505        assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
506
507        let node_ids = batch
508            .column(18)
509            .as_any()
510            .downcast_ref::<UInt64Array>()
511            .unwrap();
512        assert_eq!(1, node_ids.value(0));
513        assert!(node_ids.is_null(1));
514
515        let visible = batch
516            .column(19)
517            .as_any()
518            .downcast_ref::<BooleanArray>()
519            .unwrap();
520        assert!(!visible.value(0));
521        assert!(visible.value(1));
522    }
523
524    #[test]
525    fn test_sst_entry_storage_to_record_batch() {
526        let entries = vec![
527            StorageSstEntry {
528                file_path: "/s1".to_string(),
529                file_size: None,
530                last_modified_ms: None,
531                node_id: Some(1),
532            },
533            StorageSstEntry {
534                file_path: "/s2".to_string(),
535                file_size: Some(123),
536                last_modified_ms: Some(Timestamp::new_millisecond(456)),
537                node_id: None,
538            },
539        ];
540
541        let schema = StorageSstEntry::schema();
542        let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
543
544        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
545        assert_eq!(2, batch.num_rows());
546
547        let file_paths = batch
548            .column(0)
549            .as_any()
550            .downcast_ref::<StringArray>()
551            .unwrap();
552        assert_eq!("/s1", file_paths.value(0));
553        assert_eq!("/s2", file_paths.value(1));
554
555        let file_sizes = batch
556            .column(1)
557            .as_any()
558            .downcast_ref::<UInt64Array>()
559            .unwrap();
560        assert!(file_sizes.is_null(0));
561        assert_eq!(123, file_sizes.value(1));
562
563        let last_modified = batch
564            .column(2)
565            .as_any()
566            .downcast_ref::<TimestampMillisecondArray>()
567            .unwrap();
568        assert!(last_modified.is_null(0));
569        assert_eq!(456, last_modified.value(1));
570
571        let node_ids = batch
572            .column(3)
573            .as_any()
574            .downcast_ref::<UInt64Array>()
575            .unwrap();
576        assert_eq!(1, node_ids.value(0));
577        assert!(node_ids.is_null(1));
578    }
579
580    #[test]
581    fn test_manifest_build_plan() {
582        // Note: filter must reference a column in the projected schema
583        let request = ScanRequest {
584            projection: Some(vec![0, 1, 2]),
585            filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
586            limit: Some(5),
587            ..Default::default()
588        };
589
590        let plan = ManifestSstEntry::build_plan(request).unwrap();
591
592        // Expect plan to be Filter -> Limit -> TableScan or Filter+Limit wrapped.
593        // We'll pattern match to reach TableScan and verify key fields.
594        let (scan, has_filter, has_limit) = extract_scan(&plan);
595
596        assert!(has_filter);
597        assert!(has_limit);
598        assert_eq!(
599            scan.table_name,
600            TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
601        );
602        assert_eq!(scan.projection, Some(vec![0, 1, 2]));
603
604        // projected schema should match projection
605        let fields = scan.projected_schema.fields();
606        assert_eq!(fields.len(), 3);
607        assert_eq!(fields[0].name(), "table_dir");
608        assert_eq!(fields[1].name(), "region_id");
609        assert_eq!(fields[2].name(), "table_id");
610    }
611
612    #[test]
613    fn test_storage_build_plan() {
614        let request = ScanRequest {
615            projection: Some(vec![0, 2]),
616            filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
617            limit: Some(1),
618            ..Default::default()
619        };
620
621        let plan = StorageSstEntry::build_plan(request).unwrap();
622        let (scan, has_filter, has_limit) = extract_scan(&plan);
623        assert!(has_filter);
624        assert!(has_limit);
625        assert_eq!(
626            scan.table_name,
627            TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
628        );
629        assert_eq!(scan.projection, Some(vec![0, 2]));
630
631        let fields = scan.projected_schema.fields();
632        assert_eq!(fields.len(), 2);
633        assert_eq!(fields[0].name(), "file_path");
634        assert_eq!(fields[1].name(), "last_modified_ms");
635    }
636
637    // Helper to reach TableScan and detect presence of Filter/Limit in plan
638    fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
639        use datafusion_expr::logical_plan::Limit;
640
641        match plan {
642            LogicalPlan::Filter(f) => {
643                let (scan, _, has_limit) = extract_scan(&f.input);
644                (scan, true, has_limit)
645            }
646            LogicalPlan::Limit(Limit { input, .. }) => {
647                let (scan, has_filter, _) = extract_scan(input);
648                (scan, has_filter, true)
649            }
650            LogicalPlan::TableScan(scan) => (scan, false, false),
651            other => panic!("unexpected plan: {other:?}"),
652        }
653    }
654}