Skip to main content

mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17mod stream;
18
19#[cfg(feature = "vector_index")]
20use std::collections::BTreeSet;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{error, tracing, warn};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::{Expr, Volatility};
32use datatypes::arrow::array::ArrayRef;
33use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::data_type::ConcreteDataType;
36use datatypes::prelude::DataType;
37use datatypes::schema::ext::ArrowSchemaExt;
38use futures::StreamExt;
39use mito_codec::row_converter::build_primary_key_codec;
40use object_store::ObjectStore;
41use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
42use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema};
43use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
44use partition::expr::PartitionExpr;
45use snafu::ResultExt;
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
48use store_api::region_request::PathType;
49use store_api::storage::{ColumnId, FileId};
50use table::predicate::Predicate;
51
52use self::stream::{NestedSchemaAligner, ProjectedRecordBatchStream};
53use crate::cache::index::result_cache::PredicateKey;
54use crate::cache::{CacheStrategy, CachedSstMeta};
55#[cfg(feature = "vector_index")]
56use crate::error::ApplyVectorIndexSnafu;
57use crate::error::{
58    ParquetToArrowSchemaSnafu, ReadDataPartSnafu, Result, SerializePartitionExprSnafu,
59};
60use crate::metrics::{
61    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
62    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
63};
64use crate::read::flat_projection::CompactionProjectionMapper;
65use crate::read::prune::FlatPruneReader;
66use crate::read::read_columns::ReadColumns;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
70};
71use crate::sst::index::fulltext_index::applier::{
72    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
73};
74use crate::sst::index::inverted_index::applier::{
75    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
76};
77#[cfg(feature = "vector_index")]
78use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
79use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
80use crate::sst::parquet::file_range::{
81    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
82};
83use crate::sst::parquet::flat_format::FlatReadFormat;
84use crate::sst::parquet::format::need_override_sequence;
85use crate::sst::parquet::metadata::MetadataLoader;
86use crate::sst::parquet::prefilter::{
87    PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
88};
89use crate::sst::parquet::push_decoder::{
90    SstParquetRangeFetcher, build_sst_parquet_record_batch_stream,
91};
92use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
93use crate::sst::parquet::row_group::ParquetFetchMetrics;
94use crate::sst::parquet::row_selection::RowGroupSelection;
95use crate::sst::parquet::stats::RowGroupPruningStats;
96use crate::sst::tag_maybe_to_dictionary_field;
97
98const INDEX_TYPE_FULLTEXT: &str = "fulltext";
99const INDEX_TYPE_INVERTED: &str = "inverted";
100const INDEX_TYPE_BLOOM: &str = "bloom filter";
101const INDEX_TYPE_VECTOR: &str = "vector";
102
103macro_rules! handle_index_error {
104    ($err:expr, $file_handle:expr, $index_type:expr) => {
105        if cfg!(any(test, feature = "test")) {
106            panic!(
107                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
108                $index_type,
109                $file_handle.region_id(),
110                $file_handle.file_id(),
111                $err
112            );
113        } else {
114            warn!(
115                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
116                $index_type,
117                $file_handle.region_id(),
118                $file_handle.file_id()
119            );
120        }
121    };
122}
123
124/// Parquet SST reader builder.
125pub struct ParquetReaderBuilder {
126    /// SST directory.
127    table_dir: String,
128    /// Path type for generating file paths.
129    path_type: PathType,
130    file_handle: FileHandle,
131    object_store: ObjectStore,
132    /// Predicate to push down.
133    predicate: Option<Predicate>,
134    /// The columns to read.
135    ///
136    /// `None` reads all columns. Due to schema change, the projection
137    /// can contain columns not in the parquet file.
138    read_cols: Option<ReadColumns>,
139    /// Strategy to cache SST data.
140    cache_strategy: CacheStrategy,
141    /// Index appliers.
142    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
143    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
144    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
145    /// Vector index applier for KNN search.
146    #[cfg(feature = "vector_index")]
147    vector_index_applier: Option<VectorIndexApplierRef>,
148    /// Over-fetched k for vector index scan.
149    #[cfg(feature = "vector_index")]
150    vector_index_k: Option<usize>,
151    /// Expected metadata of the region while reading the SST.
152    /// This is usually the latest metadata of the region. The reader use
153    /// it get the correct column id of a column by name.
154    expected_metadata: Option<RegionMetadataRef>,
155    /// Whether this reader is for compaction.
156    compaction: bool,
157    /// Mode to pre-filter columns.
158    pre_filter_mode: PreFilterMode,
159    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
160    decode_primary_key_values: bool,
161    page_index_policy: PageIndexPolicy,
162    defer_optional_page_index: bool,
163}
164
165impl ParquetReaderBuilder {
166    /// Returns a new [ParquetReaderBuilder] to read specific SST.
167    pub fn new(
168        table_dir: String,
169        path_type: PathType,
170        file_handle: FileHandle,
171        object_store: ObjectStore,
172    ) -> ParquetReaderBuilder {
173        ParquetReaderBuilder {
174            table_dir,
175            path_type,
176            file_handle,
177            object_store,
178            predicate: None,
179            read_cols: None,
180            cache_strategy: CacheStrategy::Disabled,
181            inverted_index_appliers: [None, None],
182            bloom_filter_index_appliers: [None, None],
183            fulltext_index_appliers: [None, None],
184            #[cfg(feature = "vector_index")]
185            vector_index_applier: None,
186            #[cfg(feature = "vector_index")]
187            vector_index_k: None,
188            expected_metadata: None,
189            compaction: false,
190            pre_filter_mode: PreFilterMode::All,
191            decode_primary_key_values: false,
192            page_index_policy: Default::default(),
193            defer_optional_page_index: false,
194        }
195    }
196
197    /// Attaches the predicate to the builder.
198    #[must_use]
199    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
200        self.predicate = predicate;
201        self
202    }
203
204    /// Attaches the projection to the builder.
205    ///
206    /// The reader only applies the projection to fields.
207    #[must_use]
208    pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
209        self.read_cols = read_cols;
210        self
211    }
212
213    /// Attaches the cache to the builder.
214    #[must_use]
215    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
216        self.cache_strategy = cache;
217        self
218    }
219
220    /// Attaches the inverted index appliers to the builder.
221    #[must_use]
222    pub(crate) fn inverted_index_appliers(
223        mut self,
224        index_appliers: [Option<InvertedIndexApplierRef>; 2],
225    ) -> Self {
226        self.inverted_index_appliers = index_appliers;
227        self
228    }
229
230    /// Attaches the bloom filter index appliers to the builder.
231    #[must_use]
232    pub(crate) fn bloom_filter_index_appliers(
233        mut self,
234        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
235    ) -> Self {
236        self.bloom_filter_index_appliers = index_appliers;
237        self
238    }
239
240    /// Attaches the fulltext index appliers to the builder.
241    #[must_use]
242    pub(crate) fn fulltext_index_appliers(
243        mut self,
244        index_appliers: [Option<FulltextIndexApplierRef>; 2],
245    ) -> Self {
246        self.fulltext_index_appliers = index_appliers;
247        self
248    }
249
250    /// Attaches the vector index applier to the builder.
251    #[cfg(feature = "vector_index")]
252    #[must_use]
253    pub(crate) fn vector_index_applier(
254        mut self,
255        applier: Option<VectorIndexApplierRef>,
256        k: Option<usize>,
257    ) -> Self {
258        self.vector_index_applier = applier;
259        self.vector_index_k = k;
260        self
261    }
262
263    /// Attaches the expected metadata to the builder.
264    #[must_use]
265    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
266        self.expected_metadata = expected_metadata;
267        self
268    }
269
270    /// Sets the compaction flag.
271    #[must_use]
272    pub fn compaction(mut self, compaction: bool) -> Self {
273        self.compaction = compaction;
274        self
275    }
276
277    /// Sets the pre-filter mode.
278    #[must_use]
279    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
280        self.pre_filter_mode = pre_filter_mode;
281        self
282    }
283
284    /// Decodes primary key values eagerly when reading primary key format SSTs.
285    #[must_use]
286    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
287        self.decode_primary_key_values = decode;
288        self
289    }
290
291    #[must_use]
292    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
293        self.page_index_policy = page_index_policy;
294        self
295    }
296
297    /// Defers loading optional page indexes until row-level selections can use them.
298    #[must_use]
299    pub(crate) fn deferred_optional_page_index(mut self) -> Self {
300        self.page_index_policy = PageIndexPolicy::Optional;
301        self.defer_optional_page_index = true;
302        self
303    }
304
305    /// Builds a [ParquetReader].
306    ///
307    /// This needs to perform IO operation.
308    #[tracing::instrument(
309        skip_all,
310        fields(
311            region_id = %self.file_handle.region_id(),
312            file_id = %self.file_handle.file_id()
313        )
314    )]
315    pub async fn build(&self) -> Result<Option<ParquetReader>> {
316        let mut metrics = ReaderMetrics::default();
317
318        let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
319            return Ok(None);
320        };
321        ParquetReader::new(Arc::new(context), selection)
322            .await
323            .map(Some)
324    }
325
326    /// Builds a [FileRangeContext] and collects row groups to read.
327    ///
328    /// This needs to perform IO operation.
329    #[tracing::instrument(
330        skip_all,
331        fields(
332            region_id = %self.file_handle.region_id(),
333            file_id = %self.file_handle.file_id()
334        )
335    )]
336    pub(crate) async fn build_reader_input(
337        &self,
338        metrics: &mut ReaderMetrics,
339    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
340        self.build_reader_input_inner(metrics).await
341    }
342
343    async fn build_reader_input_inner(
344        &self,
345        metrics: &mut ReaderMetrics,
346    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
347        let start = Instant::now();
348
349        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
350        let file_size = self.file_handle.meta_ref().file_size;
351
352        // Loads parquet metadata of the file.
353        let initial_page_index_policy = if self.defer_optional_page_index
354            && self.page_index_policy == PageIndexPolicy::Optional
355        {
356            PageIndexPolicy::Skip
357        } else {
358            self.page_index_policy
359        };
360        let (sst_meta, mut cache_miss) = self
361            .read_parquet_metadata(
362                &file_path,
363                file_size,
364                &mut metrics.metadata_cache_metrics,
365                initial_page_index_policy,
366            )
367            .await?;
368        let mut parquet_meta = sst_meta.parquet_metadata();
369        let region_meta = sst_meta.region_metadata();
370        let region_partition_expr_str = self
371            .expected_metadata
372            .as_ref()
373            .and_then(|meta| meta.partition_expr.as_ref())
374            .map(|expr| expr.as_str());
375        let (_, is_same_region_partition) = Self::is_same_region_partition(
376            region_partition_expr_str,
377            self.file_handle.meta_ref().partition_expr.as_ref(),
378        )?;
379        // Skip auto convert when:
380        // - compaction is enabled
381        // - region partition expr is same with file partition expr (no need to auto convert)
382        let skip_auto_convert = self.compaction && is_same_region_partition;
383
384        // Build a compaction projection helper when:
385        // - compaction is enabled
386        // - region partition expr differs from file partition expr
387        // - flat format is enabled
388        // - primary key encoding is sparse
389        //
390        // This is applied after row-group filtering to align batches with flat output schema
391        // before compat handling.
392        let compaction_projection_mapper = if self.compaction
393            && !is_same_region_partition
394            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
395        {
396            Some(CompactionProjectionMapper::try_new(&region_meta)?)
397        } else {
398            None
399        };
400
401        let read_cols = if let Some(read_cols) = &self.read_cols {
402            read_cols.clone()
403        } else {
404            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
405            // Lists all column ids to read, we always use the expected metadata if possible.
406            ReadColumns::from_deduped_column_ids(
407                expected_meta
408                    .column_metadatas
409                    .iter()
410                    .map(|col| col.column_id),
411            )
412        };
413
414        let file_metadata = parquet_meta.file_metadata();
415        let parquet_schema_desc = file_metadata.schema_descr();
416        let file_schema =
417            parquet_to_arrow_schema(parquet_schema_desc, file_metadata.key_value_metadata())
418                .context(ParquetToArrowSchemaSnafu { file: &file_path })?;
419        let mut read_format = FlatReadFormat::new(
420            region_meta.clone(),
421            read_cols,
422            Some(Arc::new(file_schema)),
423            &file_path,
424            skip_auto_convert,
425        )?;
426        if need_override_sequence(&parquet_meta) {
427            read_format
428                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
429        }
430
431        // Computes the projection mask.
432        let parquet_read_cols = read_format.parquet_read_columns();
433        let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
434        let selection = self
435            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
436            .await;
437
438        if selection.is_empty() {
439            metrics.build_cost += start.elapsed();
440            return Ok(None);
441        }
442
443        let prune_schema = self
444            .expected_metadata
445            .as_ref()
446            .map(|meta| meta.schema.clone())
447            .unwrap_or_else(|| region_meta.schema.clone());
448
449        let dyn_filters = if let Some(predicate) = &self.predicate {
450            predicate.dyn_filters().as_ref().clone()
451        } else {
452            vec![]
453        };
454
455        let codec = build_primary_key_codec(read_format.metadata());
456
457        let filter_plan = build_reader_filter_plan(
458            self.predicate.as_ref(),
459            self.expected_metadata.as_deref(),
460            self.pre_filter_mode,
461            &read_format,
462            &codec,
463        );
464
465        if self.defer_optional_page_index
466            && self.page_index_policy == PageIndexPolicy::Optional
467            && (filter_plan.prefilter_builder.is_some()
468                || has_row_level_selection(&selection, &parquet_meta))
469        {
470            let (sst_meta, page_index_cache_miss) = self
471                .read_parquet_metadata(
472                    &file_path,
473                    file_size,
474                    &mut metrics.metadata_cache_metrics,
475                    PageIndexPolicy::Optional,
476                )
477                .await?;
478            parquet_meta = sst_meta.parquet_metadata();
479            cache_miss |= page_index_cache_miss;
480        }
481
482        // Trigger background download if metadata had a cache miss and selection is not empty
483        if cache_miss && !selection.is_empty() {
484            use crate::cache::file_cache::{FileType, IndexKey};
485            let index_key = IndexKey::new(
486                self.file_handle.region_id(),
487                self.file_handle.file_id().file_id(),
488                FileType::Parquet,
489            );
490            self.cache_strategy.maybe_download_background(
491                index_key,
492                file_path.clone(),
493                self.object_store.clone(),
494                file_size,
495            );
496        }
497
498        // Create ArrowReaderMetadata for async stream building.
499        let mut arrow_reader_options = ArrowReaderOptions::new();
500        if !read_format.arrow_schema().has_json_extension_field() {
501            arrow_reader_options =
502                arrow_reader_options.with_schema(read_format.arrow_schema().clone());
503        }
504        let arrow_metadata =
505            ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
506                .context(ReadDataPartSnafu)?;
507
508        let output_schema = read_format.output_arrow_schema()?;
509
510        let reader_builder = RowGroupReaderBuilder {
511            file_handle: self.file_handle.clone(),
512            file_path,
513            parquet_meta,
514            arrow_metadata,
515            output_schema,
516            object_store: self.object_store.clone(),
517            projection: projection_plan,
518            has_nested_projection: parquet_read_cols.has_nested(),
519            cache_strategy: self.cache_strategy.clone(),
520            prefilter_builder: filter_plan.prefilter_builder,
521        };
522
523        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
524
525        let context = FileRangeContext::new(
526            reader_builder,
527            RangeBase {
528                filters: filter_plan.remaining_simple_filters,
529                dyn_filters,
530                read_format,
531                expected_metadata: self.expected_metadata.clone(),
532                prune_schema,
533                codec,
534                compat_batch: None,
535                compaction_projection_mapper,
536                pre_filter_mode: self.pre_filter_mode,
537                partition_filter,
538            },
539        );
540
541        metrics.build_cost += start.elapsed();
542
543        Ok(Some((context, selection)))
544    }
545
546    fn is_same_region_partition(
547        region_partition_expr_str: Option<&str>,
548        file_partition_expr: Option<&PartitionExpr>,
549    ) -> Result<(Option<PartitionExpr>, bool)> {
550        let region_partition_expr = match region_partition_expr_str {
551            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
552            None => None,
553        };
554
555        let is_same = region_partition_expr.as_ref() == file_partition_expr;
556        Ok((region_partition_expr, is_same))
557    }
558
559    /// Compare partition expressions from expected metadata and file metadata,
560    /// and build a partition filter if they differ.
561    fn build_partition_filter(
562        &self,
563        read_format: &FlatReadFormat,
564        prune_schema: &Arc<datatypes::schema::Schema>,
565    ) -> Result<Option<PartitionFilterContext>> {
566        let region_partition_expr_str = self
567            .expected_metadata
568            .as_ref()
569            .and_then(|meta| meta.partition_expr.as_ref());
570        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
571
572        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
573            region_partition_expr_str.map(|s| s.as_str()),
574            file_partition_expr_ref,
575        )?;
576
577        if is_same_region_partition {
578            return Ok(None);
579        }
580
581        let Some(region_partition_expr) = region_partition_expr else {
582            return Ok(None);
583        };
584
585        // Collect columns referenced by the partition expression.
586        let mut referenced_columns = HashSet::new();
587        region_partition_expr.collect_column_names(&mut referenced_columns);
588
589        // Build a partition_schema containing only referenced columns.
590        let partition_schema = Arc::new(datatypes::schema::Schema::new(
591            prune_schema
592                .column_schemas()
593                .iter()
594                .filter(|col| referenced_columns.contains(&col.name))
595                .map(|col| {
596                    if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
597                        && column_meta.semantic_type == SemanticType::Tag
598                        && col.data_type.is_string()
599                    {
600                        let field = Arc::new(Field::new(
601                            &col.name,
602                            col.data_type.as_arrow_type(),
603                            col.is_nullable(),
604                        ));
605                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
606                        let mut column = col.clone();
607                        column.data_type =
608                            ConcreteDataType::from_arrow_type(dict_field.data_type());
609                        return column;
610                    }
611
612                    col.clone()
613                })
614                .collect::<Vec<_>>(),
615        ));
616
617        let region_partition_physical_expr = region_partition_expr
618            .try_as_physical_expr(partition_schema.arrow_schema())
619            .context(SerializePartitionExprSnafu)?;
620
621        Ok(Some(PartitionFilterContext {
622            region_partition_physical_expr,
623            partition_schema,
624        }))
625    }
626
627    /// Reads parquet metadata of specific file.
628    /// Returns (fused metadata, cache_miss_flag).
629    pub(crate) async fn read_parquet_metadata(
630        &self,
631        file_path: &str,
632        file_size: u64,
633        cache_metrics: &mut MetadataCacheMetrics,
634        page_index_policy: PageIndexPolicy,
635    ) -> Result<(Arc<CachedSstMeta>, bool)> {
636        let start = Instant::now();
637        let _t = READ_STAGE_ELAPSED
638            .with_label_values(&["read_parquet_metadata"])
639            .start_timer();
640
641        let file_id = self.file_handle.file_id();
642        // Tries to get from cache with metrics tracking.
643        if let Some(metadata) = self
644            .cache_strategy
645            .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
646            .await
647        {
648            cache_metrics.metadata_load_cost += start.elapsed();
649            return Ok((metadata, false));
650        }
651
652        // Cache miss, load metadata directly.
653        let mut metadata_loader =
654            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
655        metadata_loader.with_page_index_policy(page_index_policy);
656        let metadata = metadata_loader.load(cache_metrics).await?;
657
658        let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy(
659            file_path,
660            metadata,
661            None,
662            page_index_policy,
663        )?);
664        // Cache the metadata.
665        self.cache_strategy
666            .put_sst_meta_data(file_id, metadata.clone());
667
668        cache_metrics.metadata_load_cost += start.elapsed();
669        Ok((metadata, true))
670    }
671
672    /// Computes row groups to read, along with their respective row selections.
673    #[tracing::instrument(
674        skip_all,
675        fields(
676            region_id = %self.file_handle.region_id(),
677            file_id = %self.file_handle.file_id()
678        )
679    )]
680    async fn row_groups_to_read(
681        &self,
682        read_format: &FlatReadFormat,
683        parquet_meta: &ParquetMetaData,
684        metrics: &mut ReaderFilterMetrics,
685    ) -> RowGroupSelection {
686        let num_row_groups = parquet_meta.num_row_groups();
687        let num_rows = parquet_meta.file_metadata().num_rows();
688        if num_row_groups == 0 || num_rows == 0 {
689            return RowGroupSelection::default();
690        }
691
692        // Let's assume that the number of rows in the first row group
693        // can represent the `row_group_size` of the Parquet file.
694        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
695        if row_group_size == 0 {
696            return RowGroupSelection::default();
697        }
698
699        metrics.rg_total += num_row_groups;
700        metrics.rows_total += num_rows as usize;
701
702        // Compute skip_fields once for all pruning operations
703        let skip_fields = self.pre_filter_mode.skip_fields();
704
705        let mut output = self.row_groups_by_minmax(
706            read_format,
707            parquet_meta,
708            row_group_size,
709            num_rows as usize,
710            metrics,
711            skip_fields,
712        );
713        if output.is_empty() {
714            return output;
715        }
716
717        let fulltext_filtered = self
718            .prune_row_groups_by_fulltext_index(
719                row_group_size,
720                num_row_groups,
721                &mut output,
722                metrics,
723                skip_fields,
724            )
725            .await;
726        if output.is_empty() {
727            return output;
728        }
729
730        self.prune_row_groups_by_inverted_index(
731            read_format.metadata(),
732            row_group_size,
733            num_row_groups,
734            &mut output,
735            metrics,
736            skip_fields,
737        )
738        .await;
739        if output.is_empty() {
740            return output;
741        }
742
743        self.prune_row_groups_by_bloom_filter(
744            read_format.metadata(),
745            row_group_size,
746            parquet_meta,
747            &mut output,
748            metrics,
749            skip_fields,
750        )
751        .await;
752        if output.is_empty() {
753            return output;
754        }
755
756        if !fulltext_filtered {
757            self.prune_row_groups_by_fulltext_bloom(
758                row_group_size,
759                parquet_meta,
760                &mut output,
761                metrics,
762                skip_fields,
763            )
764            .await;
765        }
766        #[cfg(feature = "vector_index")]
767        {
768            self.prune_row_groups_by_vector_index(
769                row_group_size,
770                num_row_groups,
771                &mut output,
772                metrics,
773            )
774            .await;
775            if output.is_empty() {
776                return output;
777            }
778        }
779        output
780    }
781
782    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
783    async fn prune_row_groups_by_fulltext_index(
784        &self,
785        row_group_size: usize,
786        num_row_groups: usize,
787        output: &mut RowGroupSelection,
788        metrics: &mut ReaderFilterMetrics,
789        skip_fields: bool,
790    ) -> bool {
791        if !self.file_handle.meta_ref().fulltext_index_available() {
792            return false;
793        }
794
795        let mut pruned = false;
796        // If skip_fields is true, only apply the first applier (for tags).
797        let appliers = if skip_fields {
798            &self.fulltext_index_appliers[..1]
799        } else {
800            &self.fulltext_index_appliers[..]
801        };
802        for index_applier in appliers.iter().flatten() {
803            let predicate_key = index_applier.predicate_key();
804            // Fast path: return early if the result is in the cache.
805            let cached = self
806                .cache_strategy
807                .index_result_cache()
808                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
809            if let Some(result) = cached.as_ref()
810                && all_required_row_groups_searched(output, result)
811            {
812                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
813                metrics.fulltext_index_cache_hit += 1;
814                pruned = true;
815                continue;
816            }
817
818            // Slow path: apply the index from the file.
819            metrics.fulltext_index_cache_miss += 1;
820            let file_size_hint = self.file_handle.meta_ref().index_file_size();
821            let apply_res = index_applier
822                .apply_fine(
823                    self.file_handle.index_id(),
824                    Some(file_size_hint),
825                    metrics.fulltext_index_apply_metrics.as_mut(),
826                )
827                .await;
828            let selection = match apply_res {
829                Ok(Some(res)) => {
830                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
831                }
832                Ok(None) => continue,
833                Err(err) => {
834                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
835                    continue;
836                }
837            };
838
839            self.apply_index_result_and_update_cache(
840                predicate_key,
841                self.file_handle.file_id().file_id(),
842                selection,
843                output,
844                metrics,
845                INDEX_TYPE_FULLTEXT,
846            );
847            pruned = true;
848        }
849        pruned
850    }
851
852    /// Applies index to prune row groups.
853    ///
854    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
855    /// as an escape route in case of index issues, and it can be used to test
856    /// the correctness of the index.
857    async fn prune_row_groups_by_inverted_index(
858        &self,
859        sst_metadata: &RegionMetadataRef,
860        row_group_size: usize,
861        num_row_groups: usize,
862        output: &mut RowGroupSelection,
863        metrics: &mut ReaderFilterMetrics,
864        skip_fields: bool,
865    ) -> bool {
866        if !self.file_handle.meta_ref().inverted_index_available() {
867            return false;
868        }
869
870        let mut pruned = false;
871        // If skip_fields is true, only apply the first applier (for tags).
872        let appliers = if skip_fields {
873            &self.inverted_index_appliers[..1]
874        } else {
875            &self.inverted_index_appliers[..]
876        };
877        for index_applier in appliers.iter().flatten() {
878            let Ok(Some(plan)) = index_applier
879                .plan_for_sst(sst_metadata)
880                .inspect_err(|e| warn!(e; "failed to build compatible plan for sst"))
881            else {
882                continue;
883            };
884
885            // Fast path: return early if the result is in the cache.
886            let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
887                let file_id = self.file_handle.file_id().file_id();
888                cache.get(&plan.predicate_key, file_id)
889            });
890
891            if let Some(result) = cached.as_ref()
892                && all_required_row_groups_searched(output, result)
893            {
894                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
895                metrics.inverted_index_cache_hit += 1;
896                pruned = true;
897                continue;
898            }
899
900            // Slow path: apply the index from the file.
901            metrics.inverted_index_cache_miss += 1;
902            let file_size_hint = self.file_handle.meta_ref().index_file_size();
903            let apply_res = index_applier
904                .apply(
905                    self.file_handle.index_id(),
906                    Some(file_size_hint),
907                    &plan.index_applier,
908                    metrics.inverted_index_apply_metrics.as_mut(),
909                )
910                .await;
911
912            let selection = match apply_res {
913                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
914                    row_group_size,
915                    num_row_groups,
916                    apply_output,
917                ),
918                Err(err) => {
919                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
920                    continue;
921                }
922            };
923
924            self.apply_index_result_and_update_cache(
925                &plan.predicate_key,
926                self.file_handle.file_id().file_id(),
927                selection,
928                output,
929                metrics,
930                INDEX_TYPE_INVERTED,
931            );
932            pruned = true;
933        }
934        pruned
935    }
936
937    async fn prune_row_groups_by_bloom_filter(
938        &self,
939        sst_metadata: &RegionMetadataRef,
940        row_group_size: usize,
941        parquet_meta: &ParquetMetaData,
942        output: &mut RowGroupSelection,
943        metrics: &mut ReaderFilterMetrics,
944        skip_fields: bool,
945    ) -> bool {
946        if !self.file_handle.meta_ref().bloom_filter_index_available() {
947            return false;
948        }
949
950        let mut pruned = false;
951        // If skip_fields is true, only apply the first applier (for tags).
952        let appliers = if skip_fields {
953            &self.bloom_filter_index_appliers[..1]
954        } else {
955            &self.bloom_filter_index_appliers[..]
956        };
957        for index_applier in appliers.iter().flatten() {
958            let Some(compatible_predicates) =
959                index_applier.compatible_predicate_for_sst(sst_metadata)
960            else {
961                continue;
962            };
963            let predicate_key = PredicateKey::new_bloom(compatible_predicates.clone());
964            // Fast path: return early if the result is in the cache.
965            let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
966                let file_id = self.file_handle.file_id().file_id();
967                cache.get(&predicate_key, file_id)
968            });
969            if let Some(result) = cached.as_ref()
970                && all_required_row_groups_searched(output, result)
971            {
972                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
973                metrics.bloom_filter_cache_hit += 1;
974                pruned = true;
975                continue;
976            }
977
978            // Slow path: apply the index from the file.
979            metrics.bloom_filter_cache_miss += 1;
980            let file_size_hint = self.file_handle.meta_ref().index_file_size();
981            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
982                (
983                    rg.num_rows() as usize,
984                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
985                    output.contains_non_empty_row_group(i)
986                        && cached
987                            .as_ref()
988                            .map(|c| !c.contains_row_group(i))
989                            .unwrap_or(true),
990                )
991            });
992            let apply_res = index_applier
993                .apply(
994                    self.file_handle.index_id(),
995                    Some(file_size_hint),
996                    &compatible_predicates,
997                    rgs,
998                    metrics.bloom_filter_apply_metrics.as_mut(),
999                )
1000                .await;
1001            let mut selection = match apply_res {
1002                Ok(apply_output) => {
1003                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1004                }
1005                Err(err) => {
1006                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
1007                    continue;
1008                }
1009            };
1010
1011            // New searched row groups are added to `selection`, concat them with `cached`.
1012            if let Some(cached) = cached.as_ref() {
1013                selection.concat(cached);
1014            }
1015
1016            self.apply_index_result_and_update_cache(
1017                &predicate_key,
1018                self.file_handle.file_id().file_id(),
1019                selection,
1020                output,
1021                metrics,
1022                INDEX_TYPE_BLOOM,
1023            );
1024            pruned = true;
1025        }
1026        pruned
1027    }
1028
1029    /// Prunes row groups by vector index results.
1030    #[cfg(feature = "vector_index")]
1031    async fn prune_row_groups_by_vector_index(
1032        &self,
1033        row_group_size: usize,
1034        num_row_groups: usize,
1035        output: &mut RowGroupSelection,
1036        metrics: &mut ReaderFilterMetrics,
1037    ) {
1038        let Some(applier) = &self.vector_index_applier else {
1039            return;
1040        };
1041        let Some(k) = self.vector_index_k else {
1042            return;
1043        };
1044        if !self.file_handle.meta_ref().vector_index_available() {
1045            return;
1046        }
1047
1048        let file_size_hint = self.file_handle.meta_ref().index_file_size();
1049        let apply_res = applier
1050            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1051            .await;
1052        let row_ids = match apply_res {
1053            Ok(res) => res.row_offsets,
1054            Err(err) => {
1055                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1056                return;
1057            }
1058        };
1059
1060        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1061        {
1062            Ok(selection) => selection,
1063            Err(err) => {
1064                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1065                return;
1066            }
1067        };
1068        metrics.rows_vector_selected += selection.row_count();
1069        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1070    }
1071
1072    async fn prune_row_groups_by_fulltext_bloom(
1073        &self,
1074        row_group_size: usize,
1075        parquet_meta: &ParquetMetaData,
1076        output: &mut RowGroupSelection,
1077        metrics: &mut ReaderFilterMetrics,
1078        skip_fields: bool,
1079    ) -> bool {
1080        if !self.file_handle.meta_ref().fulltext_index_available() {
1081            return false;
1082        }
1083
1084        let mut pruned = false;
1085        // If skip_fields is true, only apply the first applier (for tags).
1086        let appliers = if skip_fields {
1087            &self.fulltext_index_appliers[..1]
1088        } else {
1089            &self.fulltext_index_appliers[..]
1090        };
1091        for index_applier in appliers.iter().flatten() {
1092            let predicate_key = index_applier.predicate_key();
1093            // Fast path: return early if the result is in the cache.
1094            let cached = self
1095                .cache_strategy
1096                .index_result_cache()
1097                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1098            if let Some(result) = cached.as_ref()
1099                && all_required_row_groups_searched(output, result)
1100            {
1101                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1102                metrics.fulltext_index_cache_hit += 1;
1103                pruned = true;
1104                continue;
1105            }
1106
1107            // Slow path: apply the index from the file.
1108            metrics.fulltext_index_cache_miss += 1;
1109            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1110            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1111                (
1112                    rg.num_rows() as usize,
1113                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1114                    output.contains_non_empty_row_group(i)
1115                        && cached
1116                            .as_ref()
1117                            .map(|c| !c.contains_row_group(i))
1118                            .unwrap_or(true),
1119                )
1120            });
1121            let apply_res = index_applier
1122                .apply_coarse(
1123                    self.file_handle.index_id(),
1124                    Some(file_size_hint),
1125                    rgs,
1126                    metrics.fulltext_index_apply_metrics.as_mut(),
1127                )
1128                .await;
1129            let mut selection = match apply_res {
1130                Ok(Some(apply_output)) => {
1131                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1132                }
1133                Ok(None) => continue,
1134                Err(err) => {
1135                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1136                    continue;
1137                }
1138            };
1139
1140            // New searched row groups are added to `selection`, concat them with `cached`.
1141            if let Some(cached) = cached.as_ref() {
1142                selection.concat(cached);
1143            }
1144
1145            self.apply_index_result_and_update_cache(
1146                predicate_key,
1147                self.file_handle.file_id().file_id(),
1148                selection,
1149                output,
1150                metrics,
1151                INDEX_TYPE_FULLTEXT,
1152            );
1153            pruned = true;
1154        }
1155        pruned
1156    }
1157
1158    /// Computes row groups selection after min-max pruning.
1159    fn row_groups_by_minmax(
1160        &self,
1161        read_format: &FlatReadFormat,
1162        parquet_meta: &ParquetMetaData,
1163        row_group_size: usize,
1164        total_row_count: usize,
1165        metrics: &mut ReaderFilterMetrics,
1166        skip_fields: bool,
1167    ) -> RowGroupSelection {
1168        let Some(predicate) = &self.predicate else {
1169            return RowGroupSelection::new(row_group_size, total_row_count);
1170        };
1171
1172        let file_id = self.file_handle.file_id().file_id();
1173        let index_result_cache = self.cache_strategy.index_result_cache();
1174        let cached_minmax_key =
1175            if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1176                // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
1177                // building row-group pruning stats for identical predicates across queries.
1178                let mut exprs = predicate
1179                    .exprs()
1180                    .iter()
1181                    .map(|expr| format!("{expr:?}"))
1182                    .collect::<Vec<_>>();
1183                exprs.sort();
1184                let schema_version = self
1185                    .expected_metadata
1186                    .as_ref()
1187                    .map(|meta| meta.schema_version)
1188                    .unwrap_or_else(|| read_format.metadata().schema_version);
1189                Some(PredicateKey::new_minmax(
1190                    Arc::new(exprs),
1191                    schema_version,
1192                    skip_fields,
1193                ))
1194            } else {
1195                None
1196            };
1197
1198        if let Some(index_result_cache) = index_result_cache
1199            && let Some(predicate_key) = cached_minmax_key.as_ref()
1200        {
1201            if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1202                metrics.minmax_cache_hit += 1;
1203                let num_row_groups = parquet_meta.num_row_groups();
1204                metrics.rg_minmax_filtered +=
1205                    num_row_groups.saturating_sub(result.row_group_count());
1206                return (*result).clone();
1207            }
1208
1209            metrics.minmax_cache_miss += 1;
1210        }
1211
1212        let region_meta = read_format.metadata();
1213        let row_groups = parquet_meta.row_groups();
1214        let stats = RowGroupPruningStats::new(
1215            row_groups,
1216            read_format,
1217            self.expected_metadata.clone(),
1218            skip_fields,
1219        );
1220        let prune_schema = self
1221            .expected_metadata
1222            .as_ref()
1223            .map(|meta| meta.schema.arrow_schema())
1224            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1225
1226        // Here we use the schema of the SST to build the physical expression. If the column
1227        // in the SST doesn't have the same column id as the column in the expected metadata,
1228        // we will get a None statistics for that column.
1229        let mask = predicate.prune_with_stats(&stats, prune_schema);
1230        let output = RowGroupSelection::from_full_row_group_ids(
1231            mask.iter()
1232                .enumerate()
1233                .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1234            row_group_size,
1235            total_row_count,
1236        );
1237
1238        metrics.rg_minmax_filtered += parquet_meta
1239            .num_row_groups()
1240            .saturating_sub(output.row_group_count());
1241
1242        if let Some(index_result_cache) = index_result_cache
1243            && let Some(predicate_key) = cached_minmax_key
1244        {
1245            index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1246        }
1247
1248        output
1249    }
1250
1251    fn apply_index_result_and_update_cache(
1252        &self,
1253        predicate_key: &PredicateKey,
1254        file_id: FileId,
1255        result: RowGroupSelection,
1256        output: &mut RowGroupSelection,
1257        metrics: &mut ReaderFilterMetrics,
1258        index_type: &str,
1259    ) {
1260        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1261
1262        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1263            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1264        }
1265    }
1266}
1267
1268fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool {
1269    selection.iter().any(|(row_group_idx, row_selection)| {
1270        let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else {
1271            return false;
1272        };
1273
1274        row_selection.row_count() != row_group.num_rows() as usize
1275            || row_selection.iter().any(|selector| selector.skip)
1276    })
1277}
1278
1279fn apply_selection_and_update_metrics(
1280    output: &mut RowGroupSelection,
1281    result: &RowGroupSelection,
1282    metrics: &mut ReaderFilterMetrics,
1283    index_type: &str,
1284) {
1285    let intersection = output.intersect(result);
1286
1287    let row_group_count = output.row_group_count() - intersection.row_group_count();
1288    let row_count = output.row_count() - intersection.row_count();
1289
1290    metrics.update_index_metrics(index_type, row_group_count, row_count);
1291
1292    *output = intersection;
1293}
1294
1295#[cfg(feature = "vector_index")]
1296fn vector_selection_from_offsets(
1297    row_offsets: Vec<u64>,
1298    row_group_size: usize,
1299    num_row_groups: usize,
1300) -> Result<RowGroupSelection> {
1301    let mut row_ids = BTreeSet::new();
1302    for offset in row_offsets {
1303        let row_id = u32::try_from(offset).map_err(|_| {
1304            ApplyVectorIndexSnafu {
1305                reason: format!("Row offset {} exceeds u32::MAX", offset),
1306            }
1307            .build()
1308        })?;
1309        row_ids.insert(row_id);
1310    }
1311    Ok(RowGroupSelection::from_row_ids(
1312        row_ids,
1313        row_group_size,
1314        num_row_groups,
1315    ))
1316}
1317
1318fn all_required_row_groups_searched(
1319    required_row_groups: &RowGroupSelection,
1320    cached_row_groups: &RowGroupSelection,
1321) -> bool {
1322    required_row_groups.iter().all(|(rg_id, _)| {
1323        // Row group with no rows is not required to search.
1324        !required_row_groups.contains_non_empty_row_group(*rg_id)
1325            // The row group is already searched.
1326            || cached_row_groups.contains_row_group(*rg_id)
1327    })
1328}
1329
1330/// Metrics of filtering rows groups and rows.
1331#[derive(Debug, Default, Clone)]
1332pub(crate) struct ReaderFilterMetrics {
1333    /// Number of row groups before filtering.
1334    pub(crate) rg_total: usize,
1335    /// Number of row groups filtered by fulltext index.
1336    pub(crate) rg_fulltext_filtered: usize,
1337    /// Number of row groups filtered by inverted index.
1338    pub(crate) rg_inverted_filtered: usize,
1339    /// Number of row groups filtered by min-max index.
1340    pub(crate) rg_minmax_filtered: usize,
1341    /// Number of row groups filtered by bloom filter index.
1342    pub(crate) rg_bloom_filtered: usize,
1343    /// Number of row groups filtered by vector index.
1344    pub(crate) rg_vector_filtered: usize,
1345
1346    /// Number of rows in row group before filtering.
1347    pub(crate) rows_total: usize,
1348    /// Number of rows in row group filtered by fulltext index.
1349    pub(crate) rows_fulltext_filtered: usize,
1350    /// Number of rows in row group filtered by inverted index.
1351    pub(crate) rows_inverted_filtered: usize,
1352    /// Number of rows in row group filtered by bloom filter index.
1353    pub(crate) rows_bloom_filtered: usize,
1354    /// Number of rows filtered by vector index.
1355    pub(crate) rows_vector_filtered: usize,
1356    /// Number of rows selected by vector index.
1357    pub(crate) rows_vector_selected: usize,
1358    /// Number of rows filtered by precise filter.
1359    pub(crate) rows_precise_filtered: usize,
1360
1361    /// Number of index result cache hits for fulltext index.
1362    pub(crate) fulltext_index_cache_hit: usize,
1363    /// Number of index result cache misses for fulltext index.
1364    pub(crate) fulltext_index_cache_miss: usize,
1365    /// Number of index result cache hits for inverted index.
1366    pub(crate) inverted_index_cache_hit: usize,
1367    /// Number of index result cache misses for inverted index.
1368    pub(crate) inverted_index_cache_miss: usize,
1369    /// Number of index result cache hits for bloom filter index.
1370    pub(crate) bloom_filter_cache_hit: usize,
1371    /// Number of index result cache misses for bloom filter index.
1372    pub(crate) bloom_filter_cache_miss: usize,
1373    /// Number of index result cache hits for minmax pruning.
1374    pub(crate) minmax_cache_hit: usize,
1375    /// Number of index result cache misses for minmax pruning.
1376    pub(crate) minmax_cache_miss: usize,
1377
1378    /// Optional metrics for inverted index applier.
1379    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1380    /// Optional metrics for bloom filter index applier.
1381    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1382    /// Optional metrics for fulltext index applier.
1383    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1384
1385    /// Number of pruner builder cache hits.
1386    pub(crate) pruner_cache_hit: usize,
1387    /// Number of pruner builder cache misses.
1388    pub(crate) pruner_cache_miss: usize,
1389    /// Duration spent waiting for pruner to build file ranges.
1390    pub(crate) pruner_prune_cost: Duration,
1391}
1392
1393impl ReaderFilterMetrics {
1394    /// Adds `other` metrics to this metrics.
1395    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1396        self.rg_total += other.rg_total;
1397        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1398        self.rg_inverted_filtered += other.rg_inverted_filtered;
1399        self.rg_minmax_filtered += other.rg_minmax_filtered;
1400        self.rg_bloom_filtered += other.rg_bloom_filtered;
1401        self.rg_vector_filtered += other.rg_vector_filtered;
1402
1403        self.rows_total += other.rows_total;
1404        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1405        self.rows_inverted_filtered += other.rows_inverted_filtered;
1406        self.rows_bloom_filtered += other.rows_bloom_filtered;
1407        self.rows_vector_filtered += other.rows_vector_filtered;
1408        self.rows_vector_selected += other.rows_vector_selected;
1409        self.rows_precise_filtered += other.rows_precise_filtered;
1410
1411        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1412        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1413        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1414        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1415        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1416        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1417        self.minmax_cache_hit += other.minmax_cache_hit;
1418        self.minmax_cache_miss += other.minmax_cache_miss;
1419
1420        self.pruner_cache_hit += other.pruner_cache_hit;
1421        self.pruner_cache_miss += other.pruner_cache_miss;
1422        self.pruner_prune_cost += other.pruner_prune_cost;
1423
1424        // Merge optional applier metrics
1425        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1426            self.inverted_index_apply_metrics
1427                .get_or_insert_with(Default::default)
1428                .merge_from(other_metrics);
1429        }
1430        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1431            self.bloom_filter_apply_metrics
1432                .get_or_insert_with(Default::default)
1433                .merge_from(other_metrics);
1434        }
1435        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1436            self.fulltext_index_apply_metrics
1437                .get_or_insert_with(Default::default)
1438                .merge_from(other_metrics);
1439        }
1440    }
1441
1442    /// Reports metrics.
1443    pub(crate) fn observe(&self) {
1444        READ_ROW_GROUPS_TOTAL
1445            .with_label_values(&["before_filtering"])
1446            .inc_by(self.rg_total as u64);
1447        READ_ROW_GROUPS_TOTAL
1448            .with_label_values(&["fulltext_index_filtered"])
1449            .inc_by(self.rg_fulltext_filtered as u64);
1450        READ_ROW_GROUPS_TOTAL
1451            .with_label_values(&["inverted_index_filtered"])
1452            .inc_by(self.rg_inverted_filtered as u64);
1453        READ_ROW_GROUPS_TOTAL
1454            .with_label_values(&["minmax_index_filtered"])
1455            .inc_by(self.rg_minmax_filtered as u64);
1456        READ_ROW_GROUPS_TOTAL
1457            .with_label_values(&["bloom_filter_index_filtered"])
1458            .inc_by(self.rg_bloom_filtered as u64);
1459        READ_ROW_GROUPS_TOTAL
1460            .with_label_values(&["vector_index_filtered"])
1461            .inc_by(self.rg_vector_filtered as u64);
1462
1463        PRECISE_FILTER_ROWS_TOTAL
1464            .with_label_values(&["parquet"])
1465            .inc_by(self.rows_precise_filtered as u64);
1466        READ_ROWS_IN_ROW_GROUP_TOTAL
1467            .with_label_values(&["before_filtering"])
1468            .inc_by(self.rows_total as u64);
1469        READ_ROWS_IN_ROW_GROUP_TOTAL
1470            .with_label_values(&["fulltext_index_filtered"])
1471            .inc_by(self.rows_fulltext_filtered as u64);
1472        READ_ROWS_IN_ROW_GROUP_TOTAL
1473            .with_label_values(&["inverted_index_filtered"])
1474            .inc_by(self.rows_inverted_filtered as u64);
1475        READ_ROWS_IN_ROW_GROUP_TOTAL
1476            .with_label_values(&["bloom_filter_index_filtered"])
1477            .inc_by(self.rows_bloom_filtered as u64);
1478        READ_ROWS_IN_ROW_GROUP_TOTAL
1479            .with_label_values(&["vector_index_filtered"])
1480            .inc_by(self.rows_vector_filtered as u64);
1481    }
1482
1483    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1484        match index_type {
1485            INDEX_TYPE_FULLTEXT => {
1486                self.rg_fulltext_filtered += row_group_count;
1487                self.rows_fulltext_filtered += row_count;
1488            }
1489            INDEX_TYPE_INVERTED => {
1490                self.rg_inverted_filtered += row_group_count;
1491                self.rows_inverted_filtered += row_count;
1492            }
1493            INDEX_TYPE_BLOOM => {
1494                self.rg_bloom_filtered += row_group_count;
1495                self.rows_bloom_filtered += row_count;
1496            }
1497            INDEX_TYPE_VECTOR => {
1498                self.rg_vector_filtered += row_group_count;
1499                self.rows_vector_filtered += row_count;
1500            }
1501            _ => {}
1502        }
1503    }
1504}
1505
1506#[cfg(all(test, feature = "vector_index"))]
1507mod vector_index_tests {
1508    use super::*;
1509
1510    #[test]
1511    fn test_vector_selection_from_offsets() {
1512        let row_group_size = 4;
1513        let num_row_groups = 3;
1514        let selection =
1515            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1516                .unwrap();
1517
1518        assert_eq!(selection.row_group_count(), 3);
1519        assert_eq!(selection.row_count(), 4);
1520        assert!(selection.contains_non_empty_row_group(0));
1521        assert!(selection.contains_non_empty_row_group(1));
1522        assert!(selection.contains_non_empty_row_group(2));
1523    }
1524
1525    #[test]
1526    fn test_vector_selection_from_offsets_out_of_range() {
1527        let row_group_size = 4;
1528        let num_row_groups = 2;
1529        let selection = vector_selection_from_offsets(
1530            vec![0, 7, u64::from(u32::MAX) + 1],
1531            row_group_size,
1532            num_row_groups,
1533        );
1534        assert!(selection.is_err());
1535    }
1536
1537    #[test]
1538    fn test_vector_selection_updates_metrics() {
1539        let row_group_size = 4;
1540        let total_rows = 8;
1541        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1542        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1543        let mut metrics = ReaderFilterMetrics::default();
1544
1545        apply_selection_and_update_metrics(
1546            &mut output,
1547            &selection,
1548            &mut metrics,
1549            INDEX_TYPE_VECTOR,
1550        );
1551
1552        assert_eq!(metrics.rg_vector_filtered, 1);
1553        assert_eq!(metrics.rows_vector_filtered, 7);
1554        assert_eq!(output.row_count(), 1);
1555    }
1556}
1557
1558/// Metrics for parquet metadata cache operations.
1559#[derive(Default, Clone, Copy)]
1560pub(crate) struct MetadataCacheMetrics {
1561    /// Number of memory cache hits for parquet metadata.
1562    pub(crate) mem_cache_hit: usize,
1563    /// Number of file cache hits for parquet metadata.
1564    pub(crate) file_cache_hit: usize,
1565    /// Number of cache misses for parquet metadata.
1566    pub(crate) cache_miss: usize,
1567    /// Duration to load parquet metadata.
1568    pub(crate) metadata_load_cost: Duration,
1569    /// Number of read operations performed.
1570    pub(crate) num_reads: usize,
1571    /// Total bytes read from storage.
1572    pub(crate) bytes_read: u64,
1573}
1574
1575impl std::fmt::Debug for MetadataCacheMetrics {
1576    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1577        let Self {
1578            mem_cache_hit,
1579            file_cache_hit,
1580            cache_miss,
1581            metadata_load_cost,
1582            num_reads,
1583            bytes_read,
1584        } = self;
1585
1586        if self.is_empty() {
1587            return write!(f, "{{}}");
1588        }
1589        write!(f, "{{")?;
1590
1591        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1592
1593        if *mem_cache_hit > 0 {
1594            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1595        }
1596        if *file_cache_hit > 0 {
1597            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1598        }
1599        if *cache_miss > 0 {
1600            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1601        }
1602        if *num_reads > 0 {
1603            write!(f, ", \"num_reads\":{}", num_reads)?;
1604        }
1605        if *bytes_read > 0 {
1606            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1607        }
1608
1609        write!(f, "}}")
1610    }
1611}
1612
1613impl MetadataCacheMetrics {
1614    /// Returns true if the metrics are empty (contain no meaningful data).
1615    pub(crate) fn is_empty(&self) -> bool {
1616        self.metadata_load_cost.is_zero()
1617    }
1618
1619    /// Adds `other` metrics to this metrics.
1620    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1621        self.mem_cache_hit += other.mem_cache_hit;
1622        self.file_cache_hit += other.file_cache_hit;
1623        self.cache_miss += other.cache_miss;
1624        self.metadata_load_cost += other.metadata_load_cost;
1625        self.num_reads += other.num_reads;
1626        self.bytes_read += other.bytes_read;
1627    }
1628}
1629
1630/// Parquet reader metrics.
1631#[derive(Debug, Default, Clone)]
1632pub struct ReaderMetrics {
1633    /// Filtered row groups and rows metrics.
1634    pub(crate) filter_metrics: ReaderFilterMetrics,
1635    /// Duration to build the parquet reader.
1636    pub(crate) build_cost: Duration,
1637    /// Duration to scan the reader.
1638    pub(crate) scan_cost: Duration,
1639    /// Number of record batches read.
1640    pub(crate) num_record_batches: usize,
1641    /// Number of batches decoded.
1642    pub(crate) num_batches: usize,
1643    /// Number of rows read.
1644    pub(crate) num_rows: usize,
1645    /// Metrics for parquet metadata cache.
1646    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1647    /// Optional metrics for page/row group fetch operations.
1648    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1649    /// Memory size of metadata loaded for building file ranges.
1650    pub(crate) metadata_mem_size: isize,
1651    /// Number of file range builders created.
1652    pub(crate) num_range_builders: isize,
1653}
1654
1655impl ReaderMetrics {
1656    /// Adds `other` metrics to this metrics.
1657    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1658        self.filter_metrics.merge_from(&other.filter_metrics);
1659        self.build_cost += other.build_cost;
1660        self.scan_cost += other.scan_cost;
1661        self.num_record_batches += other.num_record_batches;
1662        self.num_batches += other.num_batches;
1663        self.num_rows += other.num_rows;
1664        self.metadata_cache_metrics
1665            .merge_from(&other.metadata_cache_metrics);
1666        if let Some(other_fetch) = &other.fetch_metrics {
1667            if let Some(self_fetch) = &self.fetch_metrics {
1668                self_fetch.merge_from(other_fetch);
1669            } else {
1670                self.fetch_metrics = Some(other_fetch.clone());
1671            }
1672        }
1673        self.metadata_mem_size += other.metadata_mem_size;
1674        self.num_range_builders += other.num_range_builders;
1675    }
1676
1677    /// Reports total rows.
1678    pub(crate) fn observe_rows(&self, read_type: &str) {
1679        READ_ROWS_TOTAL
1680            .with_label_values(&[read_type])
1681            .inc_by(self.num_rows as u64);
1682    }
1683}
1684
1685/// Builder to build a parquet record batch stream for a row group.
1686pub(crate) struct RowGroupReaderBuilder {
1687    /// SST file to read.
1688    ///
1689    /// Holds the file handle to avoid the file purge it.
1690    file_handle: FileHandle,
1691    /// Path of the file.
1692    file_path: String,
1693    /// Metadata of the parquet file.
1694    parquet_meta: Arc<ParquetMetaData>,
1695    /// Arrow reader metadata for building async stream.
1696    arrow_metadata: ArrowReaderMetadata,
1697    /// Projected output schema aligned with `projection.projected_root_presence`.
1698    output_schema: SchemaRef,
1699    /// Object store as an Operator.
1700    object_store: ObjectStore,
1701    /// Projection mask.
1702    projection: ProjectionMaskPlan,
1703    /// Whether projected read columns include nested paths.
1704    has_nested_projection: bool,
1705    /// Cache.
1706    cache_strategy: CacheStrategy,
1707    /// Pre-built prefilter state. `None` if prefiltering is not applicable.
1708    prefilter_builder: Option<PrefilterContextBuilder>,
1709}
1710
1711/// Context passed to [RowGroupReaderBuilder::build()] carrying all information
1712/// needed for prefiltering decisions.
1713pub(crate) struct RowGroupBuildContext<'a> {
1714    /// Index of the row group to read.
1715    pub(crate) row_group_idx: usize,
1716    /// Row selection for the row group. `None` means all rows.
1717    pub(crate) row_selection: Option<RowSelection>,
1718    /// Metrics for tracking fetch operations.
1719    pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1720}
1721
1722impl RowGroupReaderBuilder {
1723    /// Path of the file to read.
1724    pub(crate) fn file_path(&self) -> &str {
1725        &self.file_path
1726    }
1727
1728    /// Handle of the file to read.
1729    pub(crate) fn file_handle(&self) -> &FileHandle {
1730        &self.file_handle
1731    }
1732
1733    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1734        &self.parquet_meta
1735    }
1736
1737    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1738        &self.cache_strategy
1739    }
1740
1741    pub(crate) fn has_predicate_prefilter(&self) -> bool {
1742        self.prefilter_builder.is_some()
1743    }
1744
1745    /// Builds a parquet record batch stream to read the row group at `row_group_idx`.
1746    ///
1747    /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
1748    /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
1749    /// 2. Reads the full projection with the refined row selection
1750    ///
1751    /// The prefilter pass is *best-effort pruning*, not the precise filter for the query.
1752    /// Predicates that cannot be lowered to prefilter columns (column not projected,
1753    /// expression not supported, etc.) are silently skipped. Correctness rests on the
1754    /// DataFusion `FilterExec` above this reader, which always re-applies the original
1755    /// predicate. Tag and timestamp predicates that flow through [`SimpleFilterEvaluator`]
1756    /// are an exception — the engine enforces them precisely, so the prefilter pass is the
1757    /// only place they execute. See [`build_reader_filter_plan`] for the bucketing rules.
1758    ///
1759    /// When the prefilter result selects no rows, the second read still issues but
1760    /// parquet-rs short-circuits before any column-chunk IO: the row-group state machine
1761    /// jumps to `Finished` once it sees `num_rows_selected() == 0`, so no fast path is
1762    /// added here.
1763    pub(crate) async fn build(
1764        &self,
1765        build_ctx: RowGroupBuildContext<'_>,
1766    ) -> Result<ProjectedRecordBatchStream> {
1767        let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1768
1769        let Some(mut prefilter_ctx) = prefilter_ctx else {
1770            // No prefilter applicable, build stream with full projection.
1771            let stream = self
1772                .build_with_projection(
1773                    build_ctx.row_group_idx,
1774                    build_ctx.row_selection,
1775                    self.projection.mask.clone(),
1776                    build_ctx.fetch_metrics,
1777                )
1778                .await?;
1779            return self.make_projected_stream(stream);
1780        };
1781
1782        let prefilter_start = Instant::now();
1783        let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1784        if let Some(metrics) = build_ctx.fetch_metrics {
1785            let mut data = metrics.data.lock().unwrap();
1786            data.prefilter_cost += prefilter_start.elapsed();
1787            data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1788        }
1789
1790        let refined_selection = Some(prefilter_result.refined_selection);
1791
1792        let stream = self
1793            .build_with_projection(
1794                build_ctx.row_group_idx,
1795                refined_selection,
1796                self.projection.mask.clone(),
1797                build_ctx.fetch_metrics,
1798            )
1799            .await?;
1800        self.make_projected_stream(stream)
1801    }
1802
1803    fn make_projected_stream(
1804        &self,
1805        stream: ProjectedRecordBatchStream,
1806    ) -> Result<ProjectedRecordBatchStream> {
1807        if !self.has_nested_projection {
1808            return Ok(stream);
1809        }
1810
1811        Ok(NestedSchemaAligner::new(
1812            stream,
1813            self.projection.projected_root_presence.clone(),
1814            self.output_schema.clone(),
1815        )?
1816        .boxed())
1817    }
1818
1819    /// Builds a parquet record batch stream with a custom projection mask.
1820    pub(crate) async fn build_with_projection(
1821        &self,
1822        row_group_idx: usize,
1823        row_selection: Option<RowSelection>,
1824        projection: ProjectionMask,
1825        fetch_metrics: Option<&ParquetFetchMetrics>,
1826    ) -> Result<ProjectedRecordBatchStream> {
1827        let range_fetcher = SstParquetRangeFetcher::new(
1828            self.file_handle.file_id(),
1829            self.file_path.clone(),
1830            self.object_store.clone(),
1831            self.cache_strategy.clone(),
1832            row_group_idx,
1833            fetch_metrics.cloned(),
1834        );
1835
1836        build_sst_parquet_record_batch_stream(
1837            self.arrow_metadata.clone(),
1838            row_group_idx,
1839            row_selection,
1840            projection,
1841            range_fetcher,
1842            self.file_path.clone(),
1843        )
1844    }
1845}
1846
1847#[derive(Clone)]
1848/// The filter to evaluate or the prune result of the default value.
1849pub(crate) enum MaybeFilter {
1850    /// The filter to evaluate.
1851    Filter(SimpleFilterEvaluator),
1852    /// The filter matches the default value.
1853    Matched,
1854    /// The filter is pruned.
1855    Pruned,
1856}
1857
1858impl MaybeFilter {
1859    /// Returns the inner filter when it is available.
1860    pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
1861        match self {
1862            MaybeFilter::Filter(filter) => Some(filter),
1863            MaybeFilter::Matched | MaybeFilter::Pruned => None,
1864        }
1865    }
1866}
1867
1868#[derive(Clone)]
1869/// Context to evaluate the column filter for a parquet file.
1870pub(crate) struct SimpleFilterContext {
1871    /// Filter to evaluate.
1872    filter: MaybeFilter,
1873    /// Debug string of the original logical expression.
1874    expr_str: String,
1875    /// Id of the column to evaluate.
1876    column_id: ColumnId,
1877    /// Semantic type of the column.
1878    semantic_type: SemanticType,
1879}
1880
1881impl SimpleFilterContext {
1882    /// Creates a context for the `expr`.
1883    ///
1884    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1885    /// expected metadata.
1886    pub(crate) fn new_opt(
1887        sst_meta: &RegionMetadataRef,
1888        expected_meta: Option<&RegionMetadata>,
1889        expr: &Expr,
1890    ) -> Option<Self> {
1891        let filter = SimpleFilterEvaluator::try_new(expr)?;
1892        let expr_str = format!("{expr:?}");
1893        let (column_metadata, maybe_filter) = match expected_meta {
1894            Some(meta) => {
1895                // Gets the column metadata from the expected metadata.
1896                let column = meta.column_by_name(filter.column_name())?;
1897                // Checks if the column is present in the SST metadata. We still uses the
1898                // column from the expected metadata.
1899                match sst_meta.column_by_id(column.column_id) {
1900                    Some(sst_column) => {
1901                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1902                        // Schema evolution can make field columns with the same id have
1903                        // different concrete data types across SSTs. In that case,
1904                        // evaluating this simple filter against current SST column may
1905                        // raise an invalid cross-type comparison error (e.g. Float64 == Utf8).
1906                        let maybe_filter = if sst_column.column_schema.data_type
1907                            == column.column_schema.data_type
1908                        {
1909                            MaybeFilter::Filter(filter)
1910                        } else {
1911                            // Altering tag or timestamp column types is not allowed,
1912                            // so only field columns can reach this branch.
1913                            debug_assert_eq!(column.semantic_type, SemanticType::Field);
1914                            return None;
1915                        };
1916                        (column, maybe_filter)
1917                    }
1918                    None => {
1919                        // If the column is not present in the SST metadata, we evaluate the filter
1920                        // against the default value of the column.
1921                        // If we can't evaluate the filter, we return None.
1922                        if pruned_by_default(&filter, column)? {
1923                            (column, MaybeFilter::Pruned)
1924                        } else {
1925                            (column, MaybeFilter::Matched)
1926                        }
1927                    }
1928                }
1929            }
1930            None => {
1931                let column = sst_meta.column_by_name(filter.column_name())?;
1932                (column, MaybeFilter::Filter(filter))
1933            }
1934        };
1935
1936        Some(Self {
1937            filter: maybe_filter,
1938            expr_str,
1939            column_id: column_metadata.column_id,
1940            semantic_type: column_metadata.semantic_type,
1941        })
1942    }
1943
1944    /// Returns the filter to evaluate.
1945    pub(crate) fn filter(&self) -> &MaybeFilter {
1946        &self.filter
1947    }
1948
1949    /// Returns the original logical expression string.
1950    pub(crate) fn expr_str(&self) -> &str {
1951        &self.expr_str
1952    }
1953
1954    /// Returns the column id.
1955    pub(crate) fn column_id(&self) -> ColumnId {
1956        self.column_id
1957    }
1958
1959    /// Returns the semantic type of the column.
1960    pub(crate) fn semantic_type(&self) -> SemanticType {
1961        self.semantic_type
1962    }
1963}
1964
1965/// Context to evaluate a physical expression for a parquet file.
1966#[derive(Clone)]
1967pub(crate) struct PhysicalFilterContext {
1968    /// Filter to evaluate.
1969    filter: Arc<dyn PhysicalExpr>,
1970    /// Debug string of the original logical expression.
1971    expr_str: String,
1972    /// Id of the column to evaluate.
1973    column_id: ColumnId,
1974    /// Name of the column to evaluate.
1975    column_name: String,
1976    /// Semantic type of the column.
1977    semantic_type: SemanticType,
1978    /// Schema containing only the referenced column.
1979    schema: SchemaRef,
1980    /// Whether the original logical expression is immutable across queries.
1981    immutable: bool,
1982}
1983
1984impl PhysicalFilterContext {
1985    /// Creates a context for the `expr`.
1986    ///
1987    /// Returns None if the expression doesn't reference exactly one column or the
1988    /// column to filter doesn't exist in the SST metadata or the expected metadata.
1989    pub(crate) fn new_opt(
1990        sst_meta: &RegionMetadataRef,
1991        expected_meta: Option<&RegionMetadata>,
1992        read_format: &FlatReadFormat,
1993        expr: &Expr,
1994    ) -> Option<Self> {
1995        if !Self::is_prefilter_candidate(expr) {
1996            return None;
1997        }
1998        let expr_str = format!("{expr:?}");
1999        let column_name = Self::single_column_name(expr)?;
2000        let column_metadata = match expected_meta {
2001            Some(meta) => {
2002                let column = meta.column_by_name(&column_name)?;
2003                let sst_column = sst_meta.column_by_id(column.column_id)?;
2004                // Physical expr requires the column name to match the SST column name.
2005                if sst_column.column_schema.name != column_name {
2006                    return None;
2007                }
2008                column
2009            }
2010            None => sst_meta.column_by_name(&column_name)?,
2011        };
2012
2013        // The column must be present in the projected arrow schema for the
2014        // prefilter to be able to read it.
2015        let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
2016        let field = field.clone();
2017        let schema = Arc::new(ArrowSchema::new(vec![field]));
2018        let physical_expr = Predicate::to_physical_expr(expr, &schema)
2019            .inspect_err(|e| {
2020                error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
2021            })
2022            .ok()?;
2023        let immutable = expr_is_immutable(expr);
2024
2025        Some(Self {
2026            filter: physical_expr,
2027            expr_str,
2028            column_id: column_metadata.column_id,
2029            column_name,
2030            semantic_type: column_metadata.semantic_type,
2031            schema,
2032            immutable,
2033        })
2034    }
2035
2036    /// Returns true if the expression is a variant we want to evaluate as a
2037    /// physical prefilter. Binary exprs are intentionally excluded because
2038    /// [`SimpleFilterEvaluator`] already handles them.
2039    // TODO(yingwen): extend more expressions if necessary. For example, allow some cheap scalar functions (e.g. `lower`, `length`, date truncations)
2040    fn is_prefilter_candidate(expr: &Expr) -> bool {
2041        matches!(
2042            expr,
2043            Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
2044        )
2045    }
2046
2047    fn single_column_name(expr: &Expr) -> Option<String> {
2048        let mut columns = HashSet::new();
2049        if expr_to_columns(expr, &mut columns).is_err() {
2050            return None;
2051        }
2052        if columns.len() != 1 {
2053            return None;
2054        }
2055        columns.iter().next().map(|column| column.name.clone())
2056    }
2057
2058    /// Returns the filter to evaluate.
2059    pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
2060        &self.filter
2061    }
2062
2063    /// Returns the original logical expression string.
2064    pub(crate) fn expr_str(&self) -> &str {
2065        &self.expr_str
2066    }
2067
2068    /// Returns the column id.
2069    pub(crate) fn column_id(&self) -> ColumnId {
2070        self.column_id
2071    }
2072
2073    /// Returns the column name.
2074    pub(crate) fn column_name(&self) -> &str {
2075        &self.column_name
2076    }
2077
2078    /// Returns the semantic type of the column.
2079    pub(crate) fn semantic_type(&self) -> SemanticType {
2080        self.semantic_type
2081    }
2082
2083    /// Returns the schema containing only the referenced column.
2084    pub(crate) fn schema(&self) -> &SchemaRef {
2085        &self.schema
2086    }
2087
2088    /// Returns true if the original logical expression is immutable across queries.
2089    pub(crate) fn is_immutable(&self) -> bool {
2090        self.immutable
2091    }
2092}
2093
2094fn expr_is_immutable(expr: &Expr) -> bool {
2095    let mut is_immutable = true;
2096    let _ = expr.apply(|expr| match expr {
2097        Expr::ScalarFunction(function)
2098            if function.func.signature().volatility != Volatility::Immutable =>
2099        {
2100            is_immutable = false;
2101            Ok(TreeNodeRecursion::Stop)
2102        }
2103        Expr::ScalarVariable(_, _) => {
2104            is_immutable = false;
2105            Ok(TreeNodeRecursion::Stop)
2106        }
2107        _ => Ok(TreeNodeRecursion::Continue),
2108    });
2109    is_immutable
2110}
2111
2112/// Prune a column by its default value.
2113/// Returns false if we can't create the default value or evaluate the filter.
2114fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
2115    let value = column.column_schema.create_default().ok().flatten()?;
2116    let scalar_value = value
2117        .try_to_scalar_value(&column.column_schema.data_type)
2118        .ok()?;
2119    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
2120    Some(!matches)
2121}
2122
2123/// Parquet batch reader to read our SST format.
2124pub struct ParquetReader {
2125    /// File range context.
2126    context: FileRangeContextRef,
2127    /// Row group selection to read.
2128    selection: RowGroupSelection,
2129    /// Reader of current row group.
2130    reader: Option<FlatPruneReader>,
2131    /// Metrics for tracking row group fetch operations.
2132    fetch_metrics: ParquetFetchMetrics,
2133}
2134
2135impl ParquetReader {
2136    #[tracing::instrument(
2137        skip_all,
2138        fields(
2139            region_id = %self.context.reader_builder().file_handle.region_id(),
2140            file_id = %self.context.reader_builder().file_handle.file_id()
2141        )
2142    )]
2143    pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2144        loop {
2145            if let Some(reader) = &mut self.reader {
2146                if let Some(batch) = reader.next_batch().await? {
2147                    return Ok(Some(batch));
2148                }
2149                self.reader = None;
2150                continue;
2151            }
2152
2153            let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
2154                return Ok(None);
2155            };
2156
2157            let skip_fields = self.context.pre_filter_mode().skip_fields();
2158            let parquet_reader = self
2159                .context
2160                .reader_builder()
2161                .build(self.context.build_context(
2162                    row_group_idx,
2163                    Some(row_selection),
2164                    Some(&self.fetch_metrics),
2165                ))
2166                .await?;
2167            self.reader = Some(FlatPruneReader::new_with_row_group_reader(
2168                self.context.clone(),
2169                FlatRowGroupReader::new(self.context.clone(), parquet_reader),
2170                skip_fields,
2171            ));
2172        }
2173    }
2174    /// Creates a new reader.
2175    #[tracing::instrument(
2176        skip_all,
2177        fields(
2178            region_id = %context.reader_builder().file_handle.region_id(),
2179            file_id = %context.reader_builder().file_handle.file_id()
2180        )
2181    )]
2182    pub(crate) async fn new(
2183        context: FileRangeContextRef,
2184        mut selection: RowGroupSelection,
2185    ) -> Result<Self> {
2186        let fetch_metrics = ParquetFetchMetrics::default();
2187        let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
2188            let skip_fields = context.pre_filter_mode().skip_fields();
2189            let parquet_reader = context
2190                .reader_builder()
2191                .build(context.build_context(
2192                    row_group_idx,
2193                    Some(row_selection),
2194                    Some(&fetch_metrics),
2195                ))
2196                .await?;
2197            Some(FlatPruneReader::new_with_row_group_reader(
2198                context.clone(),
2199                FlatRowGroupReader::new(context.clone(), parquet_reader),
2200                skip_fields,
2201            ))
2202        } else {
2203            None
2204        };
2205
2206        Ok(ParquetReader {
2207            context,
2208            selection,
2209            reader,
2210            fetch_metrics,
2211        })
2212    }
2213
2214    /// Returns the metadata of the SST.
2215    pub fn metadata(&self) -> &RegionMetadataRef {
2216        self.context.read_format().metadata()
2217    }
2218
2219    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2220        self.context.reader_builder().parquet_meta.clone()
2221    }
2222}
2223
2224/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2225pub(crate) struct FlatRowGroupReader {
2226    /// Context for file ranges.
2227    context: FileRangeContextRef,
2228    /// Inner parquet record batch stream.
2229    stream: ProjectedRecordBatchStream,
2230    /// Cached sequence array to override sequences.
2231    override_sequence: Option<ArrayRef>,
2232}
2233
2234impl FlatRowGroupReader {
2235    /// Creates a new flat reader from file range.
2236    pub(crate) fn new(context: FileRangeContextRef, stream: ProjectedRecordBatchStream) -> Self {
2237        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2238        let override_sequence = context
2239            .read_format()
2240            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2241
2242        Self {
2243            context,
2244            stream,
2245            override_sequence,
2246        }
2247    }
2248
2249    /// Returns the next RecordBatch.
2250    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2251        match self.stream.next().await {
2252            Some(batch_result) => {
2253                let record_batch = batch_result?;
2254
2255                let record_batch = self
2256                    .context
2257                    .read_format()
2258                    .convert_batch(record_batch, self.override_sequence.as_ref())?;
2259                Ok(Some(record_batch))
2260            }
2261            None => Ok(None),
2262        }
2263    }
2264}
2265
2266#[cfg(test)]
2267mod tests {
2268    use std::any::Any;
2269    use std::fmt::{Debug, Formatter};
2270    use std::sync::{Arc, LazyLock};
2271
2272    use datafusion::arrow::datatypes::DataType;
2273    use datafusion_common::ScalarValue;
2274    use datafusion_expr::expr::ScalarFunction;
2275    use datafusion_expr::{
2276        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2277        col, lit,
2278    };
2279    use datatypes::arrow::array::{ArrayRef, Int64Array};
2280    use datatypes::arrow::record_batch::RecordBatch;
2281    use datatypes::prelude::ConcreteDataType;
2282    use datatypes::schema::ColumnSchema;
2283    use object_store::services::Memory;
2284    use parquet::arrow::ArrowWriter;
2285    use parquet::file::properties::WriterProperties;
2286    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2287    use store_api::region_request::PathType;
2288    use store_api::storage::RegionId;
2289    use table::predicate::Predicate;
2290
2291    use super::*;
2292    use crate::sst::parquet::metadata::MetadataLoader;
2293    use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2294
2295    #[tokio::test(flavor = "current_thread")]
2296    async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2297        #[derive(Eq, PartialEq, Hash)]
2298        struct PanicDebugUdf;
2299
2300        impl Debug for PanicDebugUdf {
2301            fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2302                panic!("minmax predicate key should not format exprs when cache is disabled");
2303            }
2304        }
2305
2306        impl ScalarUDFImpl for PanicDebugUdf {
2307            fn as_any(&self) -> &dyn Any {
2308                self
2309            }
2310
2311            fn name(&self) -> &str {
2312                "panic_debug_udf"
2313            }
2314
2315            fn signature(&self) -> &Signature {
2316                static SIGNATURE: LazyLock<Signature> =
2317                    LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2318                &SIGNATURE
2319            }
2320
2321            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2322                Ok(DataType::Int64)
2323            }
2324
2325            fn invoke_with_args(
2326                &self,
2327                _args: ScalarFunctionArgs,
2328            ) -> datafusion_common::Result<ColumnarValue> {
2329                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2330            }
2331        }
2332
2333        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2334        let file_handle = sst_file_handle(0, 1);
2335        let table_dir = "test_table".to_string();
2336        let path_type = PathType::Bare;
2337        let file_path = file_handle.file_path(&table_dir, path_type);
2338
2339        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2340        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2341        let mut parquet_bytes = Vec::new();
2342        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2343        writer.write(&batch).unwrap();
2344        writer.close().unwrap();
2345        let file_size = parquet_bytes.len() as u64;
2346        object_store.write(&file_path, parquet_bytes).await.unwrap();
2347
2348        let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2349        let read_format = FlatReadFormat::new(
2350            region_metadata.clone(),
2351            ReadColumns::from_deduped_column_ids(
2352                region_metadata
2353                    .column_metadatas
2354                    .iter()
2355                    .map(|column| column.column_id),
2356            ),
2357            None,
2358            &file_path,
2359            false,
2360        )
2361        .unwrap();
2362
2363        let mut cache_metrics = MetadataCacheMetrics::default();
2364        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2365        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2366
2367        let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2368        let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2369            udf,
2370            vec![],
2371        ))]);
2372        let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2373            .predicate(Some(predicate))
2374            .cache(CacheStrategy::Disabled);
2375
2376        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2377        let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2378        let mut metrics = ReaderFilterMetrics::default();
2379        let selection = builder.row_groups_by_minmax(
2380            &read_format,
2381            &parquet_meta,
2382            row_group_size,
2383            total_row_count,
2384            &mut metrics,
2385            false,
2386        );
2387
2388        assert!(!selection.is_empty());
2389    }
2390
2391    #[test]
2392    fn test_expr_is_immutable_checks_scalar_function_volatility() {
2393        #[derive(Debug, PartialEq, Eq, Hash)]
2394        struct TestVolatilityUdf {
2395            name: String,
2396            signature: Signature,
2397        }
2398
2399        impl TestVolatilityUdf {
2400            fn new(name: &str, volatility: Volatility) -> Self {
2401                Self {
2402                    name: name.to_string(),
2403                    signature: Signature::variadic_any(volatility),
2404                }
2405            }
2406        }
2407
2408        impl ScalarUDFImpl for TestVolatilityUdf {
2409            fn as_any(&self) -> &dyn Any {
2410                self
2411            }
2412
2413            fn name(&self) -> &str {
2414                &self.name
2415            }
2416
2417            fn signature(&self) -> &Signature {
2418                &self.signature
2419            }
2420
2421            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2422                Ok(DataType::Int64)
2423            }
2424
2425            fn invoke_with_args(
2426                &self,
2427                _args: ScalarFunctionArgs,
2428            ) -> datafusion_common::Result<ColumnarValue> {
2429                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2430            }
2431        }
2432
2433        let expr = |name: &str, volatility| {
2434            Expr::ScalarFunction(ScalarFunction::new_udf(
2435                Arc::new(ScalarUDF::new_from_impl(TestVolatilityUdf::new(
2436                    name, volatility,
2437                ))),
2438                vec![],
2439            ))
2440        };
2441
2442        assert!(expr_is_immutable(&expr(
2443            "immutable_udf",
2444            Volatility::Immutable
2445        )));
2446        assert!(!expr_is_immutable(&expr("stable_udf", Volatility::Stable)));
2447        assert!(!expr_is_immutable(&expr(
2448            "volatile_udf",
2449            Volatility::Volatile
2450        )));
2451
2452        let scalar_variable = Expr::ScalarVariable(
2453            Arc::new(Field::new("@@version", DataType::Utf8, false)),
2454            vec!["@@version".to_string()],
2455        );
2456        assert!(!expr_is_immutable(&scalar_variable));
2457    }
2458
2459    #[tokio::test(flavor = "current_thread")]
2460    async fn test_has_row_level_selection() {
2461        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2462        let file_path = "row_level_selection.parquet";
2463
2464        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef;
2465        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2466        let props = WriterProperties::builder()
2467            .set_max_row_group_row_count(Some(3))
2468            .build();
2469        let mut parquet_bytes = Vec::new();
2470        let mut writer =
2471            ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
2472        writer.write(&batch).unwrap();
2473        writer.close().unwrap();
2474        let file_size = parquet_bytes.len() as u64;
2475        object_store.write(file_path, parquet_bytes).await.unwrap();
2476
2477        let mut cache_metrics = MetadataCacheMetrics::default();
2478        let loader = MetadataLoader::new(object_store, file_path, file_size);
2479        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2480        assert_eq!(2, parquet_meta.num_row_groups());
2481
2482        let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5);
2483        assert!(!has_row_level_selection(&full_row_groups, &parquet_meta));
2484
2485        let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3);
2486        assert!(has_row_level_selection(&prefix_selection, &parquet_meta));
2487
2488        let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3);
2489        assert!(has_row_level_selection(&interior_selection, &parquet_meta));
2490    }
2491
2492    fn expected_metadata_with_reused_tag_name(
2493        old_metadata: &RegionMetadata,
2494    ) -> Arc<RegionMetadata> {
2495        let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2496        builder
2497            .push_column_metadata(ColumnMetadata {
2498                column_schema: ColumnSchema::new(
2499                    "tag_0".to_string(),
2500                    ConcreteDataType::string_datatype(),
2501                    true,
2502                ),
2503                semantic_type: SemanticType::Tag,
2504                column_id: 10,
2505            })
2506            .push_column_metadata(ColumnMetadata {
2507                column_schema: ColumnSchema::new(
2508                    "tag_1".to_string(),
2509                    ConcreteDataType::string_datatype(),
2510                    true,
2511                ),
2512                semantic_type: SemanticType::Tag,
2513                column_id: 1,
2514            })
2515            .push_column_metadata(ColumnMetadata {
2516                column_schema: ColumnSchema::new(
2517                    "field_0".to_string(),
2518                    ConcreteDataType::uint64_datatype(),
2519                    true,
2520                ),
2521                semantic_type: SemanticType::Field,
2522                column_id: 2,
2523            })
2524            .push_column_metadata(ColumnMetadata {
2525                column_schema: ColumnSchema::new(
2526                    "ts".to_string(),
2527                    ConcreteDataType::timestamp_millisecond_datatype(),
2528                    false,
2529                ),
2530                semantic_type: SemanticType::Timestamp,
2531                column_id: 3,
2532            })
2533            .primary_key(vec![10, 1]);
2534
2535        Arc::new(builder.build().unwrap())
2536    }
2537
2538    #[test]
2539    fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
2540        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2541        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2542        let ctx = SimpleFilterContext::new_opt(
2543            &metadata,
2544            Some(expected_metadata.as_ref()),
2545            &col("tag_0").eq(lit("a")),
2546        )
2547        .unwrap();
2548        assert!(matches!(
2549            ctx.filter(),
2550            MaybeFilter::Matched | MaybeFilter::Pruned
2551        ));
2552    }
2553
2554    #[test]
2555    fn test_simple_filter_context_drops_mismatched_field_filter() {
2556        let (sst_metadata, latest_metadata) = mock_metadata();
2557        let ctx = SimpleFilterContext::new_opt(
2558            &sst_metadata,
2559            Some(latest_metadata.as_ref()),
2560            &col("field_0").eq(lit(1_i64)),
2561        );
2562
2563        assert!(ctx.is_none());
2564    }
2565
2566    fn mock_metadata() -> (RegionMetadataRef, RegionMetadataRef) {
2567        let region_id = RegionId::new(1, 1);
2568        let make_tag_0 = || ColumnMetadata {
2569            column_schema: ColumnSchema::new(
2570                "tag_0".to_string(),
2571                ConcreteDataType::string_datatype(),
2572                true,
2573            ),
2574            semantic_type: SemanticType::Tag,
2575            column_id: 0,
2576        };
2577        let make_ts = || ColumnMetadata {
2578            column_schema: ColumnSchema::new(
2579                "ts".to_string(),
2580                ConcreteDataType::timestamp_millisecond_datatype(),
2581                false,
2582            ),
2583            semantic_type: SemanticType::Timestamp,
2584            column_id: 2,
2585        };
2586        let make_field_0 = |data_type| ColumnMetadata {
2587            column_schema: ColumnSchema::new("field_0".to_string(), data_type, true),
2588            semantic_type: SemanticType::Field,
2589            column_id: 1,
2590        };
2591
2592        let mut sst_builder = RegionMetadataBuilder::new(region_id);
2593        sst_builder
2594            .push_column_metadata(make_tag_0())
2595            .push_column_metadata(make_field_0(ConcreteDataType::uint64_datatype()))
2596            .push_column_metadata(make_ts())
2597            .primary_key(vec![0]);
2598        let sst_metadata = Arc::new(sst_builder.build().unwrap());
2599
2600        let mut expected_builder = RegionMetadataBuilder::new(region_id);
2601        expected_builder
2602            .push_column_metadata(make_tag_0())
2603            .push_column_metadata(make_field_0(ConcreteDataType::int64_datatype()))
2604            .push_column_metadata(make_ts())
2605            .primary_key(vec![0]);
2606
2607        let expected_metadata = Arc::new(expected_builder.build().unwrap());
2608
2609        (sst_metadata, expected_metadata)
2610    }
2611
2612    #[test]
2613    fn test_physical_filter_context_skips_renamed_column() {
2614        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2615        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2616        let read_format = FlatReadFormat::new(
2617            metadata.clone(),
2618            ReadColumns::from_deduped_column_ids(
2619                metadata.column_metadatas.iter().map(|c| c.column_id),
2620            ),
2621            None,
2622            "test",
2623            true,
2624        )
2625        .unwrap();
2626
2627        let ctx = PhysicalFilterContext::new_opt(
2628            &metadata,
2629            Some(expected_metadata.as_ref()),
2630            &read_format,
2631            &col("tag_0").in_list(vec![lit("a"), lit("b")], false),
2632        );
2633
2634        assert!(ctx.is_none());
2635    }
2636
2637    #[test]
2638    fn test_physical_filter_context_only_accepts_prefilter_candidates() {
2639        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2640        let read_format = FlatReadFormat::new(
2641            metadata.clone(),
2642            ReadColumns::from_deduped_column_ids(
2643                metadata.column_metadatas.iter().map(|c| c.column_id),
2644            ),
2645            None,
2646            "test",
2647            true,
2648        )
2649        .unwrap();
2650
2651        // InList is on the allowlist — should build a context.
2652        let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
2653        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
2654
2655        // NOT IN uses the same variant with `negated: true` — also accepted.
2656        let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
2657        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &not_in).is_some());
2658
2659        // IS NULL / IS NOT NULL are accepted.
2660        let is_null = col("tag_0").is_null();
2661        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
2662        let is_not_null = col("tag_0").is_not_null();
2663        assert!(
2664            PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
2665        );
2666
2667        // BETWEEN is accepted.
2668        let between = col("field_0").between(lit(1_u64), lit(10_u64));
2669        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
2670
2671        // Binary expr is handled by SimpleFilterEvaluator — rejected here.
2672        let binary = col("tag_0").eq(lit("a"));
2673        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
2674    }
2675}