store_api/
sst_entry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_recordbatch::DfRecordBatch;
18use common_time::Timestamp;
19use common_time::timestamp::TimeUnit;
20use datafusion_common::DataFusionError;
21use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
22use datatypes::arrow::array::{
23    ArrayRef, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
24    UInt64Array,
25};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow_array::StringArray;
28use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
29use serde::{Deserialize, Serialize};
30
31use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest, TableId};
32
33/// An entry describing a SST file known by the engine's manifest.
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct ManifestSstEntry {
36    /// The table directory this file belongs to.
37    pub table_dir: String,
38    /// The region id of region that refers to the file.
39    pub region_id: RegionId,
40    /// The table id this file belongs to.
41    pub table_id: TableId,
42    /// The region number this file belongs to.
43    pub region_number: RegionNumber,
44    /// The region group this file belongs to.
45    pub region_group: RegionGroup,
46    /// The region sequence this file belongs to.
47    pub region_sequence: RegionSeq,
48    /// Engine-specific file identifier (string form).
49    pub file_id: String,
50    /// SST level.
51    pub level: u8,
52    /// Full path of the SST file in object store.
53    pub file_path: String,
54    /// File size in bytes.
55    pub file_size: u64,
56    /// Full path of the index file in object store.
57    pub index_file_path: Option<String>,
58    /// File size of the index file in object store.
59    pub index_file_size: Option<u64>,
60    /// Number of rows in the SST.
61    pub num_rows: u64,
62    /// Number of row groups in the SST.
63    pub num_row_groups: u64,
64    /// Min timestamp.
65    pub min_ts: Timestamp,
66    /// Max timestamp.
67    pub max_ts: Timestamp,
68    /// The sequence number associated with this file.
69    pub sequence: Option<u64>,
70    /// The region id of region that creates the file.
71    pub origin_region_id: RegionId,
72    /// The node id fetched from the manifest.
73    pub node_id: Option<u64>,
74}
75
76impl ManifestSstEntry {
77    /// Returns the schema of the manifest sst entry.
78    pub fn schema() -> SchemaRef {
79        use datatypes::prelude::ConcreteDataType as Ty;
80        Arc::new(Schema::new(vec![
81            ColumnSchema::new("table_dir", Ty::string_datatype(), false),
82            ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
83            ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
84            ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
85            ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
86            ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
87            ColumnSchema::new("file_id", Ty::string_datatype(), false),
88            ColumnSchema::new("level", Ty::uint8_datatype(), false),
89            ColumnSchema::new("file_path", Ty::string_datatype(), false),
90            ColumnSchema::new("file_size", Ty::uint64_datatype(), false),
91            ColumnSchema::new("index_file_path", Ty::string_datatype(), true),
92            ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
93            ColumnSchema::new("num_rows", Ty::uint64_datatype(), false),
94            ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false),
95            ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true),
96            ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true),
97            ColumnSchema::new("sequence", Ty::uint64_datatype(), true),
98            ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false),
99            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
100        ]))
101    }
102
103    /// Converts a list of manifest sst entries to a record batch.
104    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
105        let schema = Self::schema();
106        let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
107        let region_ids = entries.iter().map(|e| e.region_id.as_u64());
108        let table_ids = entries.iter().map(|e| e.table_id);
109        let region_numbers = entries.iter().map(|e| e.region_number);
110        let region_groups = entries.iter().map(|e| e.region_group);
111        let region_sequences = entries.iter().map(|e| e.region_sequence);
112        let file_ids = entries.iter().map(|e| e.file_id.as_str());
113        let levels = entries.iter().map(|e| e.level);
114        let file_paths = entries.iter().map(|e| e.file_path.as_str());
115        let file_sizes = entries.iter().map(|e| e.file_size);
116        let index_file_paths = entries.iter().map(|e| e.index_file_path.as_ref());
117        let index_file_sizes = entries.iter().map(|e| e.index_file_size);
118        let num_rows = entries.iter().map(|e| e.num_rows);
119        let num_row_groups = entries.iter().map(|e| e.num_row_groups);
120        let min_ts = entries.iter().map(|e| {
121            e.min_ts
122                .convert_to(TimeUnit::Nanosecond)
123                .map(|ts| ts.value())
124        });
125        let max_ts = entries.iter().map(|e| {
126            e.max_ts
127                .convert_to(TimeUnit::Nanosecond)
128                .map(|ts| ts.value())
129        });
130        let sequences = entries.iter().map(|e| e.sequence);
131        let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64());
132        let node_ids = entries.iter().map(|e| e.node_id);
133
134        let columns: Vec<ArrayRef> = vec![
135            Arc::new(StringArray::from_iter_values(table_dirs)),
136            Arc::new(UInt64Array::from_iter_values(region_ids)),
137            Arc::new(UInt32Array::from_iter_values(table_ids)),
138            Arc::new(UInt32Array::from_iter_values(region_numbers)),
139            Arc::new(UInt8Array::from_iter_values(region_groups)),
140            Arc::new(UInt32Array::from_iter_values(region_sequences)),
141            Arc::new(StringArray::from_iter_values(file_ids)),
142            Arc::new(UInt8Array::from_iter_values(levels)),
143            Arc::new(StringArray::from_iter_values(file_paths)),
144            Arc::new(UInt64Array::from_iter_values(file_sizes)),
145            Arc::new(StringArray::from_iter(index_file_paths)),
146            Arc::new(UInt64Array::from_iter(index_file_sizes)),
147            Arc::new(UInt64Array::from_iter_values(num_rows)),
148            Arc::new(UInt64Array::from_iter_values(num_row_groups)),
149            Arc::new(TimestampNanosecondArray::from_iter(min_ts)),
150            Arc::new(TimestampNanosecondArray::from_iter(max_ts)),
151            Arc::new(UInt64Array::from_iter(sequences)),
152            Arc::new(UInt64Array::from_iter_values(origin_region_ids)),
153            Arc::new(UInt64Array::from_iter(node_ids)),
154        ];
155
156        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
157    }
158
159    /// Reserved internal inspect table name.
160    ///
161    /// This table name is used only for building logical plans on the
162    /// frontend -> datanode path. It is not user-visible and cannot be
163    /// referenced by user queries.
164    pub fn reserved_table_name_for_inspection() -> &'static str {
165        "__inspect/__mito/__sst_manifest"
166    }
167
168    /// Builds a logical plan for scanning the manifest sst entries.
169    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
170        build_plan_helper(
171            scan_request,
172            Self::reserved_table_name_for_inspection(),
173            Self::schema(),
174        )
175    }
176}
177
178/// An entry describing a SST file listed from storage layer directly.
179#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
180pub struct StorageSstEntry {
181    /// Full path of the SST file in object store.
182    pub file_path: String,
183    /// File size in bytes.
184    pub file_size: Option<u64>,
185    /// Last modified time in milliseconds since epoch, if available from storage.
186    pub last_modified_ms: Option<Timestamp>,
187    /// The node id fetched from the manifest.
188    pub node_id: Option<u64>,
189}
190
191impl StorageSstEntry {
192    /// Returns the schema of the storage sst entry.
193    pub fn schema() -> SchemaRef {
194        use datatypes::prelude::ConcreteDataType as Ty;
195        Arc::new(Schema::new(vec![
196            ColumnSchema::new("file_path", Ty::string_datatype(), false),
197            ColumnSchema::new("file_size", Ty::uint64_datatype(), true),
198            ColumnSchema::new(
199                "last_modified_ms",
200                Ty::timestamp_millisecond_datatype(),
201                true,
202            ),
203            ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
204        ]))
205    }
206
207    /// Converts a list of storage sst entries to a record batch.
208    pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
209        let schema = Self::schema();
210        let file_paths = entries.iter().map(|e| e.file_path.as_str());
211        let file_sizes = entries.iter().map(|e| e.file_size);
212        let last_modified_ms = entries.iter().map(|e| {
213            e.last_modified_ms
214                .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value()))
215        });
216        let node_ids = entries.iter().map(|e| e.node_id);
217
218        let columns: Vec<ArrayRef> = vec![
219            Arc::new(StringArray::from_iter_values(file_paths)),
220            Arc::new(UInt64Array::from_iter(file_sizes)),
221            Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)),
222            Arc::new(UInt64Array::from_iter(node_ids)),
223        ];
224
225        DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
226    }
227
228    /// Reserved internal inspect table name.
229    ///
230    /// This table name is used only for building logical plans on the
231    /// frontend -> datanode path. It is not user-visible and cannot be
232    /// referenced by user queries.
233    pub fn reserved_table_name_for_inspection() -> &'static str {
234        "__inspect/__mito/__sst_storage"
235    }
236
237    /// Builds a logical plan for scanning the storage sst entries.
238    pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
239        build_plan_helper(
240            scan_request,
241            Self::reserved_table_name_for_inspection(),
242            Self::schema(),
243        )
244    }
245}
246
247fn build_plan_helper(
248    scan_request: ScanRequest,
249    table_name: &str,
250    schema: SchemaRef,
251) -> Result<LogicalPlan, DataFusionError> {
252    let table_source = LogicalTableSource::new(schema.arrow_schema().clone());
253
254    let mut builder = LogicalPlanBuilder::scan(
255        table_name,
256        Arc::new(table_source),
257        scan_request.projection.clone(),
258    )?;
259
260    for filter in scan_request.filters {
261        builder = builder.filter(filter)?;
262    }
263
264    if let Some(limit) = scan_request.limit {
265        builder = builder.limit(0, Some(limit))?;
266    }
267
268    builder.build()
269}
270
271#[cfg(test)]
272mod tests {
273    use datafusion_common::TableReference;
274    use datafusion_expr::{LogicalPlan, Operator, binary_expr, col, lit};
275    use datatypes::arrow::array::{
276        Array, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
277        UInt64Array,
278    };
279    use datatypes::arrow_array::StringArray;
280
281    use super::*;
282
283    #[test]
284    fn test_sst_entry_manifest_to_record_batch() {
285        // Prepare entries
286        let table_id1: TableId = 1;
287        let region_group1: RegionGroup = 2;
288        let region_seq1: RegionSeq = 3;
289        let region_number1: RegionNumber = ((region_group1 as u32) << 24) | region_seq1;
290        let region_id1 = RegionId::with_group_and_seq(table_id1, region_group1, region_seq1);
291
292        let table_id2: TableId = 5;
293        let region_group2: RegionGroup = 1;
294        let region_seq2: RegionSeq = 42;
295        let region_number2: RegionNumber = ((region_group2 as u32) << 24) | region_seq2;
296        let region_id2 = RegionId::with_group_and_seq(table_id2, region_group2, region_seq2);
297
298        let entries = vec![
299            ManifestSstEntry {
300                table_dir: "tdir1".to_string(),
301                region_id: region_id1,
302                table_id: table_id1,
303                region_number: region_number1,
304                region_group: region_group1,
305                region_sequence: region_seq1,
306                file_id: "f1".to_string(),
307                level: 1,
308                file_path: "/p1".to_string(),
309                file_size: 100,
310                index_file_path: None,
311                index_file_size: None,
312                num_rows: 10,
313                num_row_groups: 2,
314                min_ts: Timestamp::new_millisecond(1000), // 1s -> 1_000_000_000ns
315                max_ts: Timestamp::new_second(2),         // 2s -> 2_000_000_000ns
316                sequence: None,
317                origin_region_id: region_id1,
318                node_id: Some(1),
319            },
320            ManifestSstEntry {
321                table_dir: "tdir2".to_string(),
322                region_id: region_id2,
323                table_id: table_id2,
324                region_number: region_number2,
325                region_group: region_group2,
326                region_sequence: region_seq2,
327                file_id: "f2".to_string(),
328                level: 3,
329                file_path: "/p2".to_string(),
330                file_size: 200,
331                index_file_path: Some("idx".to_string()),
332                index_file_size: Some(11),
333                num_rows: 20,
334                num_row_groups: 4,
335                min_ts: Timestamp::new_nanosecond(5),     // 5ns
336                max_ts: Timestamp::new_microsecond(2000), // 2ms -> 2_000_000ns
337                sequence: Some(9),
338                origin_region_id: region_id2,
339                node_id: None,
340            },
341        ];
342
343        let schema = ManifestSstEntry::schema();
344        let batch = ManifestSstEntry::to_record_batch(&entries).unwrap();
345
346        // Schema checks
347        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
348        assert_eq!(2, batch.num_rows());
349        for (i, f) in schema.arrow_schema().fields().iter().enumerate() {
350            assert_eq!(f.name(), batch.schema().field(i).name());
351            assert_eq!(f.is_nullable(), batch.schema().field(i).is_nullable());
352            assert_eq!(f.data_type(), batch.schema().field(i).data_type());
353        }
354
355        // Column asserts
356        let table_dirs = batch
357            .column(0)
358            .as_any()
359            .downcast_ref::<StringArray>()
360            .unwrap();
361        assert_eq!("tdir1", table_dirs.value(0));
362        assert_eq!("tdir2", table_dirs.value(1));
363
364        let region_ids = batch
365            .column(1)
366            .as_any()
367            .downcast_ref::<UInt64Array>()
368            .unwrap();
369        assert_eq!(region_id1.as_u64(), region_ids.value(0));
370        assert_eq!(region_id2.as_u64(), region_ids.value(1));
371
372        let table_ids = batch
373            .column(2)
374            .as_any()
375            .downcast_ref::<UInt32Array>()
376            .unwrap();
377        assert_eq!(table_id1, table_ids.value(0));
378        assert_eq!(table_id2, table_ids.value(1));
379
380        let region_numbers = batch
381            .column(3)
382            .as_any()
383            .downcast_ref::<UInt32Array>()
384            .unwrap();
385        assert_eq!(region_number1, region_numbers.value(0));
386        assert_eq!(region_number2, region_numbers.value(1));
387
388        let region_groups = batch
389            .column(4)
390            .as_any()
391            .downcast_ref::<UInt8Array>()
392            .unwrap();
393        assert_eq!(region_group1, region_groups.value(0));
394        assert_eq!(region_group2, region_groups.value(1));
395
396        let region_sequences = batch
397            .column(5)
398            .as_any()
399            .downcast_ref::<UInt32Array>()
400            .unwrap();
401        assert_eq!(region_seq1, region_sequences.value(0));
402        assert_eq!(region_seq2, region_sequences.value(1));
403
404        let file_ids = batch
405            .column(6)
406            .as_any()
407            .downcast_ref::<StringArray>()
408            .unwrap();
409        assert_eq!("f1", file_ids.value(0));
410        assert_eq!("f2", file_ids.value(1));
411
412        let levels = batch
413            .column(7)
414            .as_any()
415            .downcast_ref::<UInt8Array>()
416            .unwrap();
417        assert_eq!(1, levels.value(0));
418        assert_eq!(3, levels.value(1));
419
420        let file_paths = batch
421            .column(8)
422            .as_any()
423            .downcast_ref::<StringArray>()
424            .unwrap();
425        assert_eq!("/p1", file_paths.value(0));
426        assert_eq!("/p2", file_paths.value(1));
427
428        let file_sizes = batch
429            .column(9)
430            .as_any()
431            .downcast_ref::<UInt64Array>()
432            .unwrap();
433        assert_eq!(100, file_sizes.value(0));
434        assert_eq!(200, file_sizes.value(1));
435
436        let index_file_paths = batch
437            .column(10)
438            .as_any()
439            .downcast_ref::<StringArray>()
440            .unwrap();
441        assert!(index_file_paths.is_null(0));
442        assert_eq!("idx", index_file_paths.value(1));
443
444        let index_file_sizes = batch
445            .column(11)
446            .as_any()
447            .downcast_ref::<UInt64Array>()
448            .unwrap();
449        assert!(index_file_sizes.is_null(0));
450        assert_eq!(11, index_file_sizes.value(1));
451
452        let num_rows = batch
453            .column(12)
454            .as_any()
455            .downcast_ref::<UInt64Array>()
456            .unwrap();
457        assert_eq!(10, num_rows.value(0));
458        assert_eq!(20, num_rows.value(1));
459
460        let num_row_groups = batch
461            .column(13)
462            .as_any()
463            .downcast_ref::<UInt64Array>()
464            .unwrap();
465        assert_eq!(2, num_row_groups.value(0));
466        assert_eq!(4, num_row_groups.value(1));
467
468        let min_ts = batch
469            .column(14)
470            .as_any()
471            .downcast_ref::<TimestampNanosecondArray>()
472            .unwrap();
473        assert_eq!(1_000_000_000, min_ts.value(0));
474        assert_eq!(5, min_ts.value(1));
475
476        let max_ts = batch
477            .column(15)
478            .as_any()
479            .downcast_ref::<TimestampNanosecondArray>()
480            .unwrap();
481        assert_eq!(2_000_000_000, max_ts.value(0));
482        assert_eq!(2_000_000, max_ts.value(1));
483
484        let sequences = batch
485            .column(16)
486            .as_any()
487            .downcast_ref::<UInt64Array>()
488            .unwrap();
489        assert!(sequences.is_null(0));
490        assert_eq!(9, sequences.value(1));
491
492        let origin_region_ids = batch
493            .column(17)
494            .as_any()
495            .downcast_ref::<UInt64Array>()
496            .unwrap();
497        assert_eq!(region_id1.as_u64(), origin_region_ids.value(0));
498        assert_eq!(region_id2.as_u64(), origin_region_ids.value(1));
499
500        let node_ids = batch
501            .column(18)
502            .as_any()
503            .downcast_ref::<UInt64Array>()
504            .unwrap();
505        assert_eq!(1, node_ids.value(0));
506        assert!(node_ids.is_null(1));
507    }
508
509    #[test]
510    fn test_sst_entry_storage_to_record_batch() {
511        let entries = vec![
512            StorageSstEntry {
513                file_path: "/s1".to_string(),
514                file_size: None,
515                last_modified_ms: None,
516                node_id: Some(1),
517            },
518            StorageSstEntry {
519                file_path: "/s2".to_string(),
520                file_size: Some(123),
521                last_modified_ms: Some(Timestamp::new_millisecond(456)),
522                node_id: None,
523            },
524        ];
525
526        let schema = StorageSstEntry::schema();
527        let batch = StorageSstEntry::to_record_batch(&entries).unwrap();
528
529        assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
530        assert_eq!(2, batch.num_rows());
531
532        let file_paths = batch
533            .column(0)
534            .as_any()
535            .downcast_ref::<StringArray>()
536            .unwrap();
537        assert_eq!("/s1", file_paths.value(0));
538        assert_eq!("/s2", file_paths.value(1));
539
540        let file_sizes = batch
541            .column(1)
542            .as_any()
543            .downcast_ref::<UInt64Array>()
544            .unwrap();
545        assert!(file_sizes.is_null(0));
546        assert_eq!(123, file_sizes.value(1));
547
548        let last_modified = batch
549            .column(2)
550            .as_any()
551            .downcast_ref::<TimestampMillisecondArray>()
552            .unwrap();
553        assert!(last_modified.is_null(0));
554        assert_eq!(456, last_modified.value(1));
555
556        let node_ids = batch
557            .column(3)
558            .as_any()
559            .downcast_ref::<UInt64Array>()
560            .unwrap();
561        assert_eq!(1, node_ids.value(0));
562        assert!(node_ids.is_null(1));
563    }
564
565    #[test]
566    fn test_manifest_build_plan() {
567        // Note: filter must reference a column in the projected schema
568        let request = ScanRequest {
569            projection: Some(vec![0, 1, 2]),
570            filters: vec![binary_expr(col("table_id"), Operator::Gt, lit(0))],
571            limit: Some(5),
572            ..Default::default()
573        };
574
575        let plan = ManifestSstEntry::build_plan(request).unwrap();
576
577        // Expect plan to be Filter -> Limit -> TableScan or Filter+Limit wrapped.
578        // We'll pattern match to reach TableScan and verify key fields.
579        let (scan, has_filter, has_limit) = extract_scan(&plan);
580
581        assert!(has_filter);
582        assert!(has_limit);
583        assert_eq!(
584            scan.table_name,
585            TableReference::bare(ManifestSstEntry::reserved_table_name_for_inspection())
586        );
587        assert_eq!(scan.projection, Some(vec![0, 1, 2]));
588
589        // projected schema should match projection
590        let fields = scan.projected_schema.fields();
591        assert_eq!(fields.len(), 3);
592        assert_eq!(fields[0].name(), "table_dir");
593        assert_eq!(fields[1].name(), "region_id");
594        assert_eq!(fields[2].name(), "table_id");
595    }
596
597    #[test]
598    fn test_storage_build_plan() {
599        let request = ScanRequest {
600            projection: Some(vec![0, 2]),
601            filters: vec![binary_expr(col("file_path"), Operator::Eq, lit("/a"))],
602            limit: Some(1),
603            ..Default::default()
604        };
605
606        let plan = StorageSstEntry::build_plan(request).unwrap();
607        let (scan, has_filter, has_limit) = extract_scan(&plan);
608        assert!(has_filter);
609        assert!(has_limit);
610        assert_eq!(
611            scan.table_name,
612            TableReference::bare(StorageSstEntry::reserved_table_name_for_inspection())
613        );
614        assert_eq!(scan.projection, Some(vec![0, 2]));
615
616        let fields = scan.projected_schema.fields();
617        assert_eq!(fields.len(), 2);
618        assert_eq!(fields[0].name(), "file_path");
619        assert_eq!(fields[1].name(), "last_modified_ms");
620    }
621
622    // Helper to reach TableScan and detect presence of Filter/Limit in plan
623    fn extract_scan(plan: &LogicalPlan) -> (&datafusion_expr::logical_plan::TableScan, bool, bool) {
624        use datafusion_expr::logical_plan::Limit;
625
626        match plan {
627            LogicalPlan::Filter(f) => {
628                let (scan, _, has_limit) = extract_scan(&f.input);
629                (scan, true, has_limit)
630            }
631            LogicalPlan::Limit(Limit { input, .. }) => {
632                let (scan, has_filter, _) = extract_scan(input);
633                (scan, has_filter, true)
634            }
635            LogicalPlan::TableScan(scan) => (scan, false, false),
636            other => panic!("unexpected plan: {other:?}"),
637        }
638    }
639}