Skip to main content

mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17mod stream;
18
19#[cfg(feature = "vector_index")]
20use std::collections::BTreeSet;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{error, tracing, warn};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::{Expr, Volatility};
32use datatypes::arrow::array::ArrayRef;
33use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::data_type::ConcreteDataType;
36use datatypes::extension::json::is_structured_json_field;
37use datatypes::prelude::DataType;
38use futures::StreamExt;
39use mito_codec::row_converter::build_primary_key_codec;
40use object_store::ObjectStore;
41use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
42use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema};
43use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
44use parquet::file::properties::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT;
45use partition::expr::PartitionExpr;
46use snafu::ResultExt;
47use store_api::codec::PrimaryKeyEncoding;
48use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
49use store_api::region_request::PathType;
50use store_api::storage::{ColumnId, FileId};
51use table::predicate::Predicate;
52
53use self::stream::{NestedSchemaAligner, ProjectedRecordBatchStream};
54use crate::cache::index::result_cache::PredicateKey;
55use crate::cache::{CacheStrategy, CachedSstMeta};
56#[cfg(feature = "vector_index")]
57use crate::error::ApplyVectorIndexSnafu;
58use crate::error::{
59    ParquetToArrowSchemaSnafu, ReadDataPartSnafu, Result, SerializePartitionExprSnafu,
60};
61use crate::metrics::{
62    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
63    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
64};
65use crate::read::flat_projection::CompactionProjectionMapper;
66use crate::read::prune::FlatPruneReader;
67use crate::read::read_columns::ReadColumns;
68use crate::sst::file::FileHandle;
69use crate::sst::index::bloom_filter::applier::{
70    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
71};
72use crate::sst::index::fulltext_index::applier::{
73    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
74};
75use crate::sst::index::inverted_index::applier::{
76    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
77};
78#[cfg(feature = "vector_index")]
79use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
80use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
81use crate::sst::parquet::file_range::{
82    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
83};
84use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
85use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, need_override_sequence};
86use crate::sst::parquet::metadata::MetadataLoader;
87use crate::sst::parquet::prefilter::{
88    PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
89};
90use crate::sst::parquet::push_decoder::{
91    SstParquetRangeFetcher, build_sst_parquet_record_batch_stream,
92};
93use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
94use crate::sst::parquet::row_group::ParquetFetchMetrics;
95use crate::sst::parquet::row_selection::RowGroupSelection;
96use crate::sst::parquet::stats::RowGroupPruningStats;
97use crate::sst::{override_pk_field_to_binary, tag_maybe_to_dictionary_field};
98
99const INDEX_TYPE_FULLTEXT: &str = "fulltext";
100
101/// Number of leading row groups sampled by [`should_read_pk_as_binary`].
102const MAX_ROW_GROUPS_TO_CHECK_PK: usize = 4;
103
104/// Returns `true` if the `__primary_key` chunk in any of the first
105/// [`MAX_ROW_GROUPS_TO_CHECK_PK`] row groups exceeds the dictionary page size
106/// limit, signalling the writer likely fell back to plain encoding.
107fn should_read_pk_as_binary(parquet_meta: &ParquetMetaData) -> bool {
108    should_read_pk_as_binary_with_limit(parquet_meta, DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT)
109}
110
111fn should_read_pk_as_binary_with_limit(
112    parquet_meta: &ParquetMetaData,
113    dict_page_size_limit: usize,
114) -> bool {
115    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
116    if num_columns < INTERNAL_COLUMN_NUM {
117        return false;
118    }
119    let pk_idx = primary_key_column_index(num_columns);
120    parquet_meta
121        .row_groups()
122        .iter()
123        .take(MAX_ROW_GROUPS_TO_CHECK_PK)
124        .any(|rg| rg.column(pk_idx).uncompressed_size() as usize > dict_page_size_limit)
125}
126const INDEX_TYPE_INVERTED: &str = "inverted";
127const INDEX_TYPE_BLOOM: &str = "bloom filter";
128const INDEX_TYPE_VECTOR: &str = "vector";
129
130macro_rules! handle_index_error {
131    ($err:expr, $file_handle:expr, $index_type:expr) => {
132        if cfg!(any(test, feature = "test")) {
133            panic!(
134                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
135                $index_type,
136                $file_handle.region_id(),
137                $file_handle.file_id(),
138                $err
139            );
140        } else {
141            warn!(
142                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
143                $index_type,
144                $file_handle.region_id(),
145                $file_handle.file_id()
146            );
147        }
148    };
149}
150
151/// Parquet SST reader builder.
152pub struct ParquetReaderBuilder {
153    /// SST directory.
154    table_dir: String,
155    /// Path type for generating file paths.
156    path_type: PathType,
157    file_handle: FileHandle,
158    object_store: ObjectStore,
159    /// Predicate to push down.
160    predicate: Option<Predicate>,
161    /// The columns to read.
162    ///
163    /// `None` reads all columns. Due to schema change, the projection
164    /// can contain columns not in the parquet file.
165    read_cols: Option<ReadColumns>,
166    /// Strategy to cache SST data.
167    cache_strategy: CacheStrategy,
168    /// Index appliers.
169    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
170    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
171    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
172    /// Vector index applier for KNN search.
173    #[cfg(feature = "vector_index")]
174    vector_index_applier: Option<VectorIndexApplierRef>,
175    /// Over-fetched k for vector index scan.
176    #[cfg(feature = "vector_index")]
177    vector_index_k: Option<usize>,
178    /// Expected metadata of the region while reading the SST.
179    /// This is usually the latest metadata of the region. The reader use
180    /// it get the correct column id of a column by name.
181    expected_metadata: Option<RegionMetadataRef>,
182    /// Whether this reader is for compaction.
183    compaction: bool,
184    /// Mode to pre-filter columns.
185    pre_filter_mode: PreFilterMode,
186    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
187    decode_primary_key_values: bool,
188    page_index_policy: PageIndexPolicy,
189    defer_optional_page_index: bool,
190}
191
192impl ParquetReaderBuilder {
193    /// Returns a new [ParquetReaderBuilder] to read specific SST.
194    pub fn new(
195        table_dir: String,
196        path_type: PathType,
197        file_handle: FileHandle,
198        object_store: ObjectStore,
199    ) -> ParquetReaderBuilder {
200        ParquetReaderBuilder {
201            table_dir,
202            path_type,
203            file_handle,
204            object_store,
205            predicate: None,
206            read_cols: None,
207            cache_strategy: CacheStrategy::Disabled,
208            inverted_index_appliers: [None, None],
209            bloom_filter_index_appliers: [None, None],
210            fulltext_index_appliers: [None, None],
211            #[cfg(feature = "vector_index")]
212            vector_index_applier: None,
213            #[cfg(feature = "vector_index")]
214            vector_index_k: None,
215            expected_metadata: None,
216            compaction: false,
217            pre_filter_mode: PreFilterMode::All,
218            decode_primary_key_values: false,
219            page_index_policy: Default::default(),
220            defer_optional_page_index: false,
221        }
222    }
223
224    /// Attaches the predicate to the builder.
225    #[must_use]
226    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
227        self.predicate = predicate;
228        self
229    }
230
231    /// Attaches the projection to the builder.
232    ///
233    /// The reader only applies the projection to fields.
234    #[must_use]
235    pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
236        self.read_cols = read_cols;
237        self
238    }
239
240    /// Attaches the cache to the builder.
241    #[must_use]
242    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
243        self.cache_strategy = cache;
244        self
245    }
246
247    /// Attaches the inverted index appliers to the builder.
248    #[must_use]
249    pub(crate) fn inverted_index_appliers(
250        mut self,
251        index_appliers: [Option<InvertedIndexApplierRef>; 2],
252    ) -> Self {
253        self.inverted_index_appliers = index_appliers;
254        self
255    }
256
257    /// Attaches the bloom filter index appliers to the builder.
258    #[must_use]
259    pub(crate) fn bloom_filter_index_appliers(
260        mut self,
261        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
262    ) -> Self {
263        self.bloom_filter_index_appliers = index_appliers;
264        self
265    }
266
267    /// Attaches the fulltext index appliers to the builder.
268    #[must_use]
269    pub(crate) fn fulltext_index_appliers(
270        mut self,
271        index_appliers: [Option<FulltextIndexApplierRef>; 2],
272    ) -> Self {
273        self.fulltext_index_appliers = index_appliers;
274        self
275    }
276
277    /// Attaches the vector index applier to the builder.
278    #[cfg(feature = "vector_index")]
279    #[must_use]
280    pub(crate) fn vector_index_applier(
281        mut self,
282        applier: Option<VectorIndexApplierRef>,
283        k: Option<usize>,
284    ) -> Self {
285        self.vector_index_applier = applier;
286        self.vector_index_k = k;
287        self
288    }
289
290    /// Attaches the expected metadata to the builder.
291    #[must_use]
292    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
293        self.expected_metadata = expected_metadata;
294        self
295    }
296
297    /// Sets the compaction flag.
298    #[must_use]
299    pub fn compaction(mut self, compaction: bool) -> Self {
300        self.compaction = compaction;
301        self
302    }
303
304    /// Sets the pre-filter mode.
305    #[must_use]
306    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
307        self.pre_filter_mode = pre_filter_mode;
308        self
309    }
310
311    /// Decodes primary key values eagerly when reading primary key format SSTs.
312    #[must_use]
313    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
314        self.decode_primary_key_values = decode;
315        self
316    }
317
318    #[must_use]
319    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
320        self.page_index_policy = page_index_policy;
321        self
322    }
323
324    /// Defers loading optional page indexes until row-level selections can use them.
325    #[must_use]
326    pub(crate) fn deferred_optional_page_index(mut self) -> Self {
327        self.page_index_policy = PageIndexPolicy::Optional;
328        self.defer_optional_page_index = true;
329        self
330    }
331
332    /// Builds a [ParquetReader].
333    ///
334    /// This needs to perform IO operation.
335    #[tracing::instrument(
336        skip_all,
337        fields(
338            region_id = %self.file_handle.region_id(),
339            file_id = %self.file_handle.file_id()
340        )
341    )]
342    pub async fn build(&self) -> Result<Option<ParquetReader>> {
343        let mut metrics = ReaderMetrics::default();
344
345        let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
346            return Ok(None);
347        };
348        ParquetReader::new(Arc::new(context), selection)
349            .await
350            .map(Some)
351    }
352
353    /// Builds a [FileRangeContext] and collects row groups to read.
354    ///
355    /// This needs to perform IO operation.
356    #[tracing::instrument(
357        skip_all,
358        fields(
359            region_id = %self.file_handle.region_id(),
360            file_id = %self.file_handle.file_id()
361        )
362    )]
363    pub async fn build_reader_input(
364        &self,
365        metrics: &mut ReaderMetrics,
366    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
367        self.build_reader_input_inner(metrics).await
368    }
369
370    async fn build_reader_input_inner(
371        &self,
372        metrics: &mut ReaderMetrics,
373    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
374        let start = Instant::now();
375
376        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
377        let file_size = self.file_handle.meta_ref().file_size;
378
379        // Loads parquet metadata of the file.
380        let initial_page_index_policy = if self.defer_optional_page_index
381            && self.page_index_policy == PageIndexPolicy::Optional
382        {
383            PageIndexPolicy::Skip
384        } else {
385            self.page_index_policy
386        };
387        let (sst_meta, mut cache_miss) = self
388            .read_parquet_metadata(
389                &file_path,
390                file_size,
391                &mut metrics.metadata_cache_metrics,
392                initial_page_index_policy,
393            )
394            .await?;
395        let mut parquet_meta = sst_meta.parquet_metadata();
396        let region_meta = sst_meta.region_metadata();
397        let region_partition_expr_str = self
398            .expected_metadata
399            .as_ref()
400            .and_then(|meta| meta.partition_expr.as_ref())
401            .map(|expr| expr.as_str());
402        let (_, is_same_region_partition) = Self::is_same_region_partition(
403            region_partition_expr_str,
404            self.file_handle.meta_ref().partition_expr.as_ref(),
405        )?;
406        // Skip auto convert when:
407        // - compaction is enabled
408        // - region partition expr is same with file partition expr (no need to auto convert)
409        let skip_auto_convert = self.compaction && is_same_region_partition;
410
411        // Build a compaction projection helper when:
412        // - compaction is enabled
413        // - region partition expr differs from file partition expr
414        // - flat format is enabled
415        // - primary key encoding is sparse
416        //
417        // This is applied after row-group filtering to align batches with flat output schema
418        // before compat handling.
419        let compaction_projection_mapper = if self.compaction
420            && !is_same_region_partition
421            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
422        {
423            Some(CompactionProjectionMapper::try_new(&region_meta)?)
424        } else {
425            None
426        };
427
428        let read_cols = if let Some(read_cols) = &self.read_cols {
429            read_cols.clone()
430        } else {
431            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
432            // Lists all column ids to read, we always use the expected metadata if possible.
433            ReadColumns::from_deduped_column_ids(
434                expected_meta
435                    .column_metadatas
436                    .iter()
437                    .map(|col| col.column_id),
438            )
439        };
440
441        let file_metadata = parquet_meta.file_metadata();
442        let parquet_schema_desc = file_metadata.schema_descr();
443        let file_schema =
444            parquet_to_arrow_schema(parquet_schema_desc, file_metadata.key_value_metadata())
445                .context(ParquetToArrowSchemaSnafu { file: &file_path })?;
446        let mut read_format = FlatReadFormat::new(
447            region_meta.clone(),
448            read_cols,
449            Some(Arc::new(file_schema)),
450            &file_path,
451            skip_auto_convert,
452        )?;
453        if need_override_sequence(&parquet_meta) {
454            read_format
455                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
456        }
457
458        // Computes the projection mask.
459        let parquet_read_cols = read_format.parquet_read_columns();
460        let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
461        let has_nested_projection = parquet_read_cols.has_nested();
462        let selection = self
463            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
464            .await;
465
466        if selection.is_empty() {
467            metrics.build_cost += start.elapsed();
468            return Ok(None);
469        }
470
471        let prune_schema = self
472            .expected_metadata
473            .as_ref()
474            .map(|meta| meta.schema.clone())
475            .unwrap_or_else(|| region_meta.schema.clone());
476
477        let dyn_filters = if let Some(predicate) = &self.predicate {
478            predicate.dyn_filters().as_ref().clone()
479        } else {
480            vec![]
481        };
482
483        let codec = build_primary_key_codec(read_format.metadata());
484
485        let filter_plan = build_reader_filter_plan(
486            self.predicate.as_ref(),
487            self.expected_metadata.as_deref(),
488            self.pre_filter_mode,
489            &read_format,
490            &codec,
491        );
492
493        if self.defer_optional_page_index
494            && self.page_index_policy == PageIndexPolicy::Optional
495            && (filter_plan.prefilter_builder.is_some()
496                || has_row_level_selection(&selection, &parquet_meta))
497        {
498            let (sst_meta, page_index_cache_miss) = self
499                .read_parquet_metadata(
500                    &file_path,
501                    file_size,
502                    &mut metrics.metadata_cache_metrics,
503                    PageIndexPolicy::Optional,
504                )
505                .await?;
506            parquet_meta = sst_meta.parquet_metadata();
507            cache_miss |= page_index_cache_miss;
508        }
509
510        // Trigger background download if metadata had a cache miss and selection is not empty
511        if cache_miss && !selection.is_empty() {
512            use crate::cache::file_cache::{FileType, IndexKey};
513            let index_key = IndexKey::new(
514                self.file_handle.region_id(),
515                self.file_handle.file_id().file_id(),
516                FileType::Parquet,
517            );
518            self.cache_strategy.maybe_download_background(
519                index_key,
520                file_path.clone(),
521                self.object_store.clone(),
522                file_size,
523            );
524        }
525
526        // Create ArrowReaderMetadata for async stream building.
527        let mut arrow_reader_options = ArrowReaderOptions::new();
528        if !read_format
529            .arrow_schema()
530            .fields()
531            .iter()
532            .any(is_structured_json_field)
533        {
534            // Read `__primary_key` as Binary when it's too large for dictionary
535            // encoding; convert_batch wraps it back to a DictionaryArray.
536            let schema_for_reader = if should_read_pk_as_binary(&parquet_meta) {
537                read_format.set_pk_as_binary()?;
538                override_pk_field_to_binary(read_format.arrow_schema())
539            } else {
540                read_format.arrow_schema().clone()
541            };
542            arrow_reader_options = arrow_reader_options.with_schema(schema_for_reader);
543        }
544        let arrow_metadata =
545            ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
546                .context(ReadDataPartSnafu)?;
547
548        let output_schema = read_format.output_arrow_schema()?;
549
550        let reader_builder = RowGroupReaderBuilder {
551            file_handle: self.file_handle.clone(),
552            file_path,
553            parquet_meta,
554            arrow_metadata,
555            output_schema,
556            object_store: self.object_store.clone(),
557            projection: projection_plan,
558            has_nested_projection,
559            cache_strategy: self.cache_strategy.clone(),
560            prefilter_builder: filter_plan.prefilter_builder,
561        };
562
563        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
564
565        let context = FileRangeContext::new(
566            reader_builder,
567            RangeBase {
568                filters: filter_plan.remaining_simple_filters,
569                dyn_filters,
570                read_format,
571                expected_metadata: self.expected_metadata.clone(),
572                prune_schema,
573                codec,
574                compat_batch: None,
575                compaction_projection_mapper,
576                pre_filter_mode: self.pre_filter_mode,
577                partition_filter,
578            },
579        );
580
581        metrics.build_cost += start.elapsed();
582
583        Ok(Some((context, selection)))
584    }
585
586    fn is_same_region_partition(
587        region_partition_expr_str: Option<&str>,
588        file_partition_expr: Option<&PartitionExpr>,
589    ) -> Result<(Option<PartitionExpr>, bool)> {
590        let region_partition_expr = match region_partition_expr_str {
591            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
592            None => None,
593        };
594
595        let is_same = region_partition_expr.as_ref() == file_partition_expr;
596        Ok((region_partition_expr, is_same))
597    }
598
599    /// Compare partition expressions from expected metadata and file metadata,
600    /// and build a partition filter if they differ.
601    fn build_partition_filter(
602        &self,
603        read_format: &FlatReadFormat,
604        prune_schema: &Arc<datatypes::schema::Schema>,
605    ) -> Result<Option<PartitionFilterContext>> {
606        let region_partition_expr_str = self
607            .expected_metadata
608            .as_ref()
609            .and_then(|meta| meta.partition_expr.as_ref());
610        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
611
612        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
613            region_partition_expr_str.map(|s| s.as_str()),
614            file_partition_expr_ref,
615        )?;
616
617        if is_same_region_partition {
618            return Ok(None);
619        }
620
621        let Some(region_partition_expr) = region_partition_expr else {
622            return Ok(None);
623        };
624
625        // Collect columns referenced by the partition expression.
626        let mut referenced_columns = HashSet::new();
627        region_partition_expr.collect_column_names(&mut referenced_columns);
628
629        // Build a partition_schema containing only referenced columns.
630        let partition_schema = Arc::new(datatypes::schema::Schema::new(
631            prune_schema
632                .column_schemas()
633                .iter()
634                .filter(|col| referenced_columns.contains(&col.name))
635                .map(|col| {
636                    if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
637                        && column_meta.semantic_type == SemanticType::Tag
638                        && col.data_type.is_string()
639                    {
640                        let field = Arc::new(Field::new(
641                            &col.name,
642                            col.data_type.as_arrow_type(),
643                            col.is_nullable(),
644                        ));
645                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
646                        let mut column = col.clone();
647                        column.data_type =
648                            ConcreteDataType::from_arrow_type(dict_field.data_type());
649                        return column;
650                    }
651
652                    col.clone()
653                })
654                .collect::<Vec<_>>(),
655        ));
656
657        let region_partition_physical_expr = region_partition_expr
658            .try_as_physical_expr(partition_schema.arrow_schema())
659            .context(SerializePartitionExprSnafu)?;
660
661        Ok(Some(PartitionFilterContext {
662            region_partition_physical_expr,
663            partition_schema,
664        }))
665    }
666
667    /// Reads parquet metadata of specific file.
668    /// Returns (fused metadata, cache_miss_flag).
669    pub(crate) async fn read_parquet_metadata(
670        &self,
671        file_path: &str,
672        file_size: u64,
673        cache_metrics: &mut MetadataCacheMetrics,
674        page_index_policy: PageIndexPolicy,
675    ) -> Result<(Arc<CachedSstMeta>, bool)> {
676        let start = Instant::now();
677        let _t = READ_STAGE_ELAPSED
678            .with_label_values(&["read_parquet_metadata"])
679            .start_timer();
680
681        let file_id = self.file_handle.file_id();
682        // Tries to get from cache with metrics tracking.
683        if let Some(metadata) = self
684            .cache_strategy
685            .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
686            .await
687        {
688            cache_metrics.metadata_load_cost += start.elapsed();
689            return Ok((metadata, false));
690        }
691
692        // Cache miss, load metadata directly.
693        let mut metadata_loader =
694            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
695        metadata_loader.with_page_index_policy(page_index_policy);
696        let metadata = metadata_loader.load(cache_metrics).await?;
697
698        let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy(
699            file_path,
700            metadata,
701            None,
702            page_index_policy,
703        )?);
704        // Cache the metadata.
705        self.cache_strategy
706            .put_sst_meta_data(file_id, metadata.clone());
707
708        cache_metrics.metadata_load_cost += start.elapsed();
709        Ok((metadata, true))
710    }
711
712    /// Computes row groups to read, along with their respective row selections.
713    #[tracing::instrument(
714        skip_all,
715        fields(
716            region_id = %self.file_handle.region_id(),
717            file_id = %self.file_handle.file_id()
718        )
719    )]
720    async fn row_groups_to_read(
721        &self,
722        read_format: &FlatReadFormat,
723        parquet_meta: &ParquetMetaData,
724        metrics: &mut ReaderFilterMetrics,
725    ) -> RowGroupSelection {
726        let num_row_groups = parquet_meta.num_row_groups();
727        let num_rows = parquet_meta.file_metadata().num_rows();
728        if num_row_groups == 0 || num_rows == 0 {
729            return RowGroupSelection::default();
730        }
731
732        // Let's assume that the number of rows in the first row group
733        // can represent the `row_group_size` of the Parquet file.
734        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
735        if row_group_size == 0 {
736            return RowGroupSelection::default();
737        }
738
739        metrics.rg_total += num_row_groups;
740        metrics.rows_total += num_rows as usize;
741
742        // Compute skip_fields once for all pruning operations
743        let skip_fields = self.pre_filter_mode.skip_fields();
744
745        let mut output = self.row_groups_by_minmax(
746            read_format,
747            parquet_meta,
748            row_group_size,
749            num_rows as usize,
750            metrics,
751            skip_fields,
752        );
753        if output.is_empty() {
754            return output;
755        }
756
757        let fulltext_filtered = self
758            .prune_row_groups_by_fulltext_index(
759                row_group_size,
760                num_row_groups,
761                &mut output,
762                metrics,
763                skip_fields,
764            )
765            .await;
766        if output.is_empty() {
767            return output;
768        }
769
770        self.prune_row_groups_by_inverted_index(
771            read_format.metadata(),
772            row_group_size,
773            num_row_groups,
774            &mut output,
775            metrics,
776            skip_fields,
777        )
778        .await;
779        if output.is_empty() {
780            return output;
781        }
782
783        self.prune_row_groups_by_bloom_filter(
784            read_format.metadata(),
785            row_group_size,
786            parquet_meta,
787            &mut output,
788            metrics,
789            skip_fields,
790        )
791        .await;
792        if output.is_empty() {
793            return output;
794        }
795
796        if !fulltext_filtered {
797            self.prune_row_groups_by_fulltext_bloom(
798                row_group_size,
799                parquet_meta,
800                &mut output,
801                metrics,
802                skip_fields,
803            )
804            .await;
805        }
806        #[cfg(feature = "vector_index")]
807        {
808            self.prune_row_groups_by_vector_index(
809                row_group_size,
810                num_row_groups,
811                &mut output,
812                metrics,
813            )
814            .await;
815            if output.is_empty() {
816                return output;
817            }
818        }
819        output
820    }
821
822    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
823    async fn prune_row_groups_by_fulltext_index(
824        &self,
825        row_group_size: usize,
826        num_row_groups: usize,
827        output: &mut RowGroupSelection,
828        metrics: &mut ReaderFilterMetrics,
829        skip_fields: bool,
830    ) -> bool {
831        if !self.file_handle.meta_ref().fulltext_index_available() {
832            return false;
833        }
834
835        let mut pruned = false;
836        // If skip_fields is true, only apply the first applier (for tags).
837        let appliers = if skip_fields {
838            &self.fulltext_index_appliers[..1]
839        } else {
840            &self.fulltext_index_appliers[..]
841        };
842        for index_applier in appliers.iter().flatten() {
843            let predicate_key = index_applier.predicate_key();
844            // Fast path: return early if the result is in the cache.
845            let cached = self
846                .cache_strategy
847                .index_result_cache()
848                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
849            if let Some(result) = cached.as_ref()
850                && all_required_row_groups_searched(output, result)
851            {
852                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
853                metrics.fulltext_index_cache_hit += 1;
854                pruned = true;
855                continue;
856            }
857
858            // Slow path: apply the index from the file.
859            metrics.fulltext_index_cache_miss += 1;
860            let file_size_hint = self.file_handle.meta_ref().index_file_size();
861            let apply_res = index_applier
862                .apply_fine(
863                    self.file_handle.index_id(),
864                    Some(file_size_hint),
865                    metrics.fulltext_index_apply_metrics.as_mut(),
866                )
867                .await;
868            let selection = match apply_res {
869                Ok(Some(res)) => {
870                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
871                }
872                Ok(None) => continue,
873                Err(err) => {
874                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
875                    continue;
876                }
877            };
878
879            self.apply_index_result_and_update_cache(
880                predicate_key,
881                self.file_handle.file_id().file_id(),
882                selection,
883                output,
884                metrics,
885                INDEX_TYPE_FULLTEXT,
886            );
887            pruned = true;
888        }
889        pruned
890    }
891
892    /// Applies index to prune row groups.
893    ///
894    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
895    /// as an escape route in case of index issues, and it can be used to test
896    /// the correctness of the index.
897    async fn prune_row_groups_by_inverted_index(
898        &self,
899        sst_metadata: &RegionMetadataRef,
900        row_group_size: usize,
901        num_row_groups: usize,
902        output: &mut RowGroupSelection,
903        metrics: &mut ReaderFilterMetrics,
904        skip_fields: bool,
905    ) -> bool {
906        if !self.file_handle.meta_ref().inverted_index_available() {
907            return false;
908        }
909
910        let mut pruned = false;
911        // If skip_fields is true, only apply the first applier (for tags).
912        let appliers = if skip_fields {
913            &self.inverted_index_appliers[..1]
914        } else {
915            &self.inverted_index_appliers[..]
916        };
917        for index_applier in appliers.iter().flatten() {
918            let Ok(Some(plan)) = index_applier
919                .plan_for_sst(sst_metadata)
920                .inspect_err(|e| warn!(e; "failed to build compatible plan for sst"))
921            else {
922                continue;
923            };
924
925            // Fast path: return early if the result is in the cache.
926            let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
927                let file_id = self.file_handle.file_id().file_id();
928                cache.get(&plan.predicate_key, file_id)
929            });
930
931            if let Some(result) = cached.as_ref()
932                && all_required_row_groups_searched(output, result)
933            {
934                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
935                metrics.inverted_index_cache_hit += 1;
936                pruned = true;
937                continue;
938            }
939
940            // Slow path: apply the index from the file.
941            metrics.inverted_index_cache_miss += 1;
942            let file_size_hint = self.file_handle.meta_ref().index_file_size();
943            let apply_res = index_applier
944                .apply(
945                    self.file_handle.index_id(),
946                    Some(file_size_hint),
947                    &plan.index_applier,
948                    metrics.inverted_index_apply_metrics.as_mut(),
949                )
950                .await;
951
952            let selection = match apply_res {
953                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
954                    row_group_size,
955                    num_row_groups,
956                    apply_output,
957                ),
958                Err(err) => {
959                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
960                    continue;
961                }
962            };
963
964            self.apply_index_result_and_update_cache(
965                &plan.predicate_key,
966                self.file_handle.file_id().file_id(),
967                selection,
968                output,
969                metrics,
970                INDEX_TYPE_INVERTED,
971            );
972            pruned = true;
973        }
974        pruned
975    }
976
977    async fn prune_row_groups_by_bloom_filter(
978        &self,
979        sst_metadata: &RegionMetadataRef,
980        row_group_size: usize,
981        parquet_meta: &ParquetMetaData,
982        output: &mut RowGroupSelection,
983        metrics: &mut ReaderFilterMetrics,
984        skip_fields: bool,
985    ) -> bool {
986        if !self.file_handle.meta_ref().bloom_filter_index_available() {
987            return false;
988        }
989
990        let mut pruned = false;
991        // If skip_fields is true, only apply the first applier (for tags).
992        let appliers = if skip_fields {
993            &self.bloom_filter_index_appliers[..1]
994        } else {
995            &self.bloom_filter_index_appliers[..]
996        };
997        for index_applier in appliers.iter().flatten() {
998            let Some(compatible_predicates) =
999                index_applier.compatible_predicate_for_sst(sst_metadata)
1000            else {
1001                continue;
1002            };
1003            let predicate_key = PredicateKey::new_bloom(compatible_predicates.clone());
1004            // Fast path: return early if the result is in the cache.
1005            let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
1006                let file_id = self.file_handle.file_id().file_id();
1007                cache.get(&predicate_key, file_id)
1008            });
1009            if let Some(result) = cached.as_ref()
1010                && all_required_row_groups_searched(output, result)
1011            {
1012                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
1013                metrics.bloom_filter_cache_hit += 1;
1014                pruned = true;
1015                continue;
1016            }
1017
1018            // Slow path: apply the index from the file.
1019            metrics.bloom_filter_cache_miss += 1;
1020            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1021            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1022                (
1023                    rg.num_rows() as usize,
1024                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1025                    output.contains_non_empty_row_group(i)
1026                        && cached
1027                            .as_ref()
1028                            .map(|c| !c.contains_row_group(i))
1029                            .unwrap_or(true),
1030                )
1031            });
1032            let apply_res = index_applier
1033                .apply(
1034                    self.file_handle.index_id(),
1035                    Some(file_size_hint),
1036                    &compatible_predicates,
1037                    rgs,
1038                    metrics.bloom_filter_apply_metrics.as_mut(),
1039                )
1040                .await;
1041            let mut selection = match apply_res {
1042                Ok(apply_output) => {
1043                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1044                }
1045                Err(err) => {
1046                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
1047                    continue;
1048                }
1049            };
1050
1051            // New searched row groups are added to `selection`, concat them with `cached`.
1052            if let Some(cached) = cached.as_ref() {
1053                selection.concat(cached);
1054            }
1055
1056            self.apply_index_result_and_update_cache(
1057                &predicate_key,
1058                self.file_handle.file_id().file_id(),
1059                selection,
1060                output,
1061                metrics,
1062                INDEX_TYPE_BLOOM,
1063            );
1064            pruned = true;
1065        }
1066        pruned
1067    }
1068
1069    /// Prunes row groups by vector index results.
1070    #[cfg(feature = "vector_index")]
1071    async fn prune_row_groups_by_vector_index(
1072        &self,
1073        row_group_size: usize,
1074        num_row_groups: usize,
1075        output: &mut RowGroupSelection,
1076        metrics: &mut ReaderFilterMetrics,
1077    ) {
1078        let Some(applier) = &self.vector_index_applier else {
1079            return;
1080        };
1081        let Some(k) = self.vector_index_k else {
1082            return;
1083        };
1084        if !self.file_handle.meta_ref().vector_index_available() {
1085            return;
1086        }
1087
1088        let file_size_hint = self.file_handle.meta_ref().index_file_size();
1089        let apply_res = applier
1090            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1091            .await;
1092        let row_ids = match apply_res {
1093            Ok(res) => res.row_offsets,
1094            Err(err) => {
1095                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1096                return;
1097            }
1098        };
1099
1100        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1101        {
1102            Ok(selection) => selection,
1103            Err(err) => {
1104                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1105                return;
1106            }
1107        };
1108        metrics.rows_vector_selected += selection.row_count();
1109        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1110    }
1111
1112    async fn prune_row_groups_by_fulltext_bloom(
1113        &self,
1114        row_group_size: usize,
1115        parquet_meta: &ParquetMetaData,
1116        output: &mut RowGroupSelection,
1117        metrics: &mut ReaderFilterMetrics,
1118        skip_fields: bool,
1119    ) -> bool {
1120        if !self.file_handle.meta_ref().fulltext_index_available() {
1121            return false;
1122        }
1123
1124        let mut pruned = false;
1125        // If skip_fields is true, only apply the first applier (for tags).
1126        let appliers = if skip_fields {
1127            &self.fulltext_index_appliers[..1]
1128        } else {
1129            &self.fulltext_index_appliers[..]
1130        };
1131        for index_applier in appliers.iter().flatten() {
1132            let predicate_key = index_applier.predicate_key();
1133            // Fast path: return early if the result is in the cache.
1134            let cached = self
1135                .cache_strategy
1136                .index_result_cache()
1137                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1138            if let Some(result) = cached.as_ref()
1139                && all_required_row_groups_searched(output, result)
1140            {
1141                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1142                metrics.fulltext_index_cache_hit += 1;
1143                pruned = true;
1144                continue;
1145            }
1146
1147            // Slow path: apply the index from the file.
1148            metrics.fulltext_index_cache_miss += 1;
1149            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1150            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1151                (
1152                    rg.num_rows() as usize,
1153                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1154                    output.contains_non_empty_row_group(i)
1155                        && cached
1156                            .as_ref()
1157                            .map(|c| !c.contains_row_group(i))
1158                            .unwrap_or(true),
1159                )
1160            });
1161            let apply_res = index_applier
1162                .apply_coarse(
1163                    self.file_handle.index_id(),
1164                    Some(file_size_hint),
1165                    rgs,
1166                    metrics.fulltext_index_apply_metrics.as_mut(),
1167                )
1168                .await;
1169            let mut selection = match apply_res {
1170                Ok(Some(apply_output)) => {
1171                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1172                }
1173                Ok(None) => continue,
1174                Err(err) => {
1175                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1176                    continue;
1177                }
1178            };
1179
1180            // New searched row groups are added to `selection`, concat them with `cached`.
1181            if let Some(cached) = cached.as_ref() {
1182                selection.concat(cached);
1183            }
1184
1185            self.apply_index_result_and_update_cache(
1186                predicate_key,
1187                self.file_handle.file_id().file_id(),
1188                selection,
1189                output,
1190                metrics,
1191                INDEX_TYPE_FULLTEXT,
1192            );
1193            pruned = true;
1194        }
1195        pruned
1196    }
1197
1198    /// Computes row groups selection after min-max pruning.
1199    fn row_groups_by_minmax(
1200        &self,
1201        read_format: &FlatReadFormat,
1202        parquet_meta: &ParquetMetaData,
1203        row_group_size: usize,
1204        total_row_count: usize,
1205        metrics: &mut ReaderFilterMetrics,
1206        skip_fields: bool,
1207    ) -> RowGroupSelection {
1208        let Some(predicate) = &self.predicate else {
1209            return RowGroupSelection::new(row_group_size, total_row_count);
1210        };
1211
1212        let file_id = self.file_handle.file_id().file_id();
1213        let index_result_cache = self.cache_strategy.index_result_cache();
1214        let cached_minmax_key =
1215            if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1216                // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
1217                // building row-group pruning stats for identical predicates across queries.
1218                let mut exprs = predicate
1219                    .exprs()
1220                    .iter()
1221                    .map(|expr| format!("{expr:?}"))
1222                    .collect::<Vec<_>>();
1223                exprs.sort();
1224                let schema_version = self
1225                    .expected_metadata
1226                    .as_ref()
1227                    .map(|meta| meta.schema_version)
1228                    .unwrap_or_else(|| read_format.metadata().schema_version);
1229                Some(PredicateKey::new_minmax(
1230                    Arc::new(exprs),
1231                    schema_version,
1232                    skip_fields,
1233                ))
1234            } else {
1235                None
1236            };
1237
1238        if let Some(index_result_cache) = index_result_cache
1239            && let Some(predicate_key) = cached_minmax_key.as_ref()
1240        {
1241            if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1242                metrics.minmax_cache_hit += 1;
1243                let num_row_groups = parquet_meta.num_row_groups();
1244                metrics.rg_minmax_filtered +=
1245                    num_row_groups.saturating_sub(result.row_group_count());
1246                return (*result).clone();
1247            }
1248
1249            metrics.minmax_cache_miss += 1;
1250        }
1251
1252        let region_meta = read_format.metadata();
1253        let row_groups = parquet_meta.row_groups();
1254        let stats = RowGroupPruningStats::new(
1255            row_groups,
1256            read_format,
1257            self.expected_metadata.clone(),
1258            skip_fields,
1259        );
1260        let prune_schema = self
1261            .expected_metadata
1262            .as_ref()
1263            .map(|meta| meta.schema.arrow_schema())
1264            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1265
1266        // Here we use the schema of the SST to build the physical expression. If the column
1267        // in the SST doesn't have the same column id as the column in the expected metadata,
1268        // we will get a None statistics for that column.
1269        let mask = predicate.prune_with_stats(&stats, prune_schema);
1270        let output = RowGroupSelection::from_full_row_group_ids(
1271            mask.iter()
1272                .enumerate()
1273                .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1274            row_group_size,
1275            total_row_count,
1276        );
1277
1278        metrics.rg_minmax_filtered += parquet_meta
1279            .num_row_groups()
1280            .saturating_sub(output.row_group_count());
1281
1282        if let Some(index_result_cache) = index_result_cache
1283            && let Some(predicate_key) = cached_minmax_key
1284        {
1285            index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1286        }
1287
1288        output
1289    }
1290
1291    fn apply_index_result_and_update_cache(
1292        &self,
1293        predicate_key: &PredicateKey,
1294        file_id: FileId,
1295        result: RowGroupSelection,
1296        output: &mut RowGroupSelection,
1297        metrics: &mut ReaderFilterMetrics,
1298        index_type: &str,
1299    ) {
1300        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1301
1302        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1303            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1304        }
1305    }
1306}
1307
1308fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool {
1309    selection.iter().any(|(row_group_idx, row_selection)| {
1310        let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else {
1311            return false;
1312        };
1313
1314        row_selection.row_count() != row_group.num_rows() as usize
1315            || row_selection.iter().any(|selector| selector.skip)
1316    })
1317}
1318
1319fn apply_selection_and_update_metrics(
1320    output: &mut RowGroupSelection,
1321    result: &RowGroupSelection,
1322    metrics: &mut ReaderFilterMetrics,
1323    index_type: &str,
1324) {
1325    let intersection = output.intersect(result);
1326
1327    let row_group_count = output.row_group_count() - intersection.row_group_count();
1328    let row_count = output.row_count() - intersection.row_count();
1329
1330    metrics.update_index_metrics(index_type, row_group_count, row_count);
1331
1332    *output = intersection;
1333}
1334
1335#[cfg(feature = "vector_index")]
1336fn vector_selection_from_offsets(
1337    row_offsets: Vec<u64>,
1338    row_group_size: usize,
1339    num_row_groups: usize,
1340) -> Result<RowGroupSelection> {
1341    let mut row_ids = BTreeSet::new();
1342    for offset in row_offsets {
1343        let row_id = u32::try_from(offset).map_err(|_| {
1344            ApplyVectorIndexSnafu {
1345                reason: format!("Row offset {} exceeds u32::MAX", offset),
1346            }
1347            .build()
1348        })?;
1349        row_ids.insert(row_id);
1350    }
1351    Ok(RowGroupSelection::from_row_ids(
1352        row_ids,
1353        row_group_size,
1354        num_row_groups,
1355    ))
1356}
1357
1358fn all_required_row_groups_searched(
1359    required_row_groups: &RowGroupSelection,
1360    cached_row_groups: &RowGroupSelection,
1361) -> bool {
1362    required_row_groups.iter().all(|(rg_id, _)| {
1363        // Row group with no rows is not required to search.
1364        !required_row_groups.contains_non_empty_row_group(*rg_id)
1365            // The row group is already searched.
1366            || cached_row_groups.contains_row_group(*rg_id)
1367    })
1368}
1369
1370/// Metrics of filtering rows groups and rows.
1371#[derive(Debug, Default, Clone)]
1372pub(crate) struct ReaderFilterMetrics {
1373    /// Number of row groups before filtering.
1374    pub(crate) rg_total: usize,
1375    /// Number of row groups filtered by fulltext index.
1376    pub(crate) rg_fulltext_filtered: usize,
1377    /// Number of row groups filtered by inverted index.
1378    pub(crate) rg_inverted_filtered: usize,
1379    /// Number of row groups filtered by min-max index.
1380    pub(crate) rg_minmax_filtered: usize,
1381    /// Number of row groups filtered by bloom filter index.
1382    pub(crate) rg_bloom_filtered: usize,
1383    /// Number of row groups filtered by vector index.
1384    pub(crate) rg_vector_filtered: usize,
1385
1386    /// Number of rows in row group before filtering.
1387    pub(crate) rows_total: usize,
1388    /// Number of rows in row group filtered by fulltext index.
1389    pub(crate) rows_fulltext_filtered: usize,
1390    /// Number of rows in row group filtered by inverted index.
1391    pub(crate) rows_inverted_filtered: usize,
1392    /// Number of rows in row group filtered by bloom filter index.
1393    pub(crate) rows_bloom_filtered: usize,
1394    /// Number of rows filtered by vector index.
1395    pub(crate) rows_vector_filtered: usize,
1396    /// Number of rows selected by vector index.
1397    pub(crate) rows_vector_selected: usize,
1398    /// Number of rows filtered by precise filter.
1399    pub(crate) rows_precise_filtered: usize,
1400
1401    /// Number of index result cache hits for fulltext index.
1402    pub(crate) fulltext_index_cache_hit: usize,
1403    /// Number of index result cache misses for fulltext index.
1404    pub(crate) fulltext_index_cache_miss: usize,
1405    /// Number of index result cache hits for inverted index.
1406    pub(crate) inverted_index_cache_hit: usize,
1407    /// Number of index result cache misses for inverted index.
1408    pub(crate) inverted_index_cache_miss: usize,
1409    /// Number of index result cache hits for bloom filter index.
1410    pub(crate) bloom_filter_cache_hit: usize,
1411    /// Number of index result cache misses for bloom filter index.
1412    pub(crate) bloom_filter_cache_miss: usize,
1413    /// Number of index result cache hits for minmax pruning.
1414    pub(crate) minmax_cache_hit: usize,
1415    /// Number of index result cache misses for minmax pruning.
1416    pub(crate) minmax_cache_miss: usize,
1417
1418    /// Optional metrics for inverted index applier.
1419    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1420    /// Optional metrics for bloom filter index applier.
1421    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1422    /// Optional metrics for fulltext index applier.
1423    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1424
1425    /// Number of pruner builder cache hits.
1426    pub(crate) pruner_cache_hit: usize,
1427    /// Number of pruner builder cache misses.
1428    pub(crate) pruner_cache_miss: usize,
1429    /// Duration spent waiting for pruner to build file ranges.
1430    pub(crate) pruner_prune_cost: Duration,
1431    /// Number of files filtered by manifest time-range pruning.
1432    pub(crate) files_time_range_pruned: usize,
1433}
1434
1435impl ReaderFilterMetrics {
1436    /// Adds `other` metrics to this metrics.
1437    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1438        self.rg_total += other.rg_total;
1439        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1440        self.rg_inverted_filtered += other.rg_inverted_filtered;
1441        self.rg_minmax_filtered += other.rg_minmax_filtered;
1442        self.rg_bloom_filtered += other.rg_bloom_filtered;
1443        self.rg_vector_filtered += other.rg_vector_filtered;
1444
1445        self.rows_total += other.rows_total;
1446        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1447        self.rows_inverted_filtered += other.rows_inverted_filtered;
1448        self.rows_bloom_filtered += other.rows_bloom_filtered;
1449        self.rows_vector_filtered += other.rows_vector_filtered;
1450        self.rows_vector_selected += other.rows_vector_selected;
1451        self.rows_precise_filtered += other.rows_precise_filtered;
1452
1453        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1454        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1455        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1456        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1457        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1458        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1459        self.minmax_cache_hit += other.minmax_cache_hit;
1460        self.minmax_cache_miss += other.minmax_cache_miss;
1461
1462        self.pruner_cache_hit += other.pruner_cache_hit;
1463        self.pruner_cache_miss += other.pruner_cache_miss;
1464        self.pruner_prune_cost += other.pruner_prune_cost;
1465        self.files_time_range_pruned += other.files_time_range_pruned;
1466
1467        // Merge optional applier metrics
1468        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1469            self.inverted_index_apply_metrics
1470                .get_or_insert_with(Default::default)
1471                .merge_from(other_metrics);
1472        }
1473        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1474            self.bloom_filter_apply_metrics
1475                .get_or_insert_with(Default::default)
1476                .merge_from(other_metrics);
1477        }
1478        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1479            self.fulltext_index_apply_metrics
1480                .get_or_insert_with(Default::default)
1481                .merge_from(other_metrics);
1482        }
1483    }
1484
1485    /// Reports metrics.
1486    pub(crate) fn observe(&self) {
1487        READ_ROW_GROUPS_TOTAL
1488            .with_label_values(&["before_filtering"])
1489            .inc_by(self.rg_total as u64);
1490        READ_ROW_GROUPS_TOTAL
1491            .with_label_values(&["fulltext_index_filtered"])
1492            .inc_by(self.rg_fulltext_filtered as u64);
1493        READ_ROW_GROUPS_TOTAL
1494            .with_label_values(&["inverted_index_filtered"])
1495            .inc_by(self.rg_inverted_filtered as u64);
1496        READ_ROW_GROUPS_TOTAL
1497            .with_label_values(&["minmax_index_filtered"])
1498            .inc_by(self.rg_minmax_filtered as u64);
1499        READ_ROW_GROUPS_TOTAL
1500            .with_label_values(&["bloom_filter_index_filtered"])
1501            .inc_by(self.rg_bloom_filtered as u64);
1502        READ_ROW_GROUPS_TOTAL
1503            .with_label_values(&["vector_index_filtered"])
1504            .inc_by(self.rg_vector_filtered as u64);
1505
1506        PRECISE_FILTER_ROWS_TOTAL
1507            .with_label_values(&["parquet"])
1508            .inc_by(self.rows_precise_filtered as u64);
1509        READ_ROWS_IN_ROW_GROUP_TOTAL
1510            .with_label_values(&["before_filtering"])
1511            .inc_by(self.rows_total as u64);
1512        READ_ROWS_IN_ROW_GROUP_TOTAL
1513            .with_label_values(&["fulltext_index_filtered"])
1514            .inc_by(self.rows_fulltext_filtered as u64);
1515        READ_ROWS_IN_ROW_GROUP_TOTAL
1516            .with_label_values(&["inverted_index_filtered"])
1517            .inc_by(self.rows_inverted_filtered as u64);
1518        READ_ROWS_IN_ROW_GROUP_TOTAL
1519            .with_label_values(&["bloom_filter_index_filtered"])
1520            .inc_by(self.rows_bloom_filtered as u64);
1521        READ_ROWS_IN_ROW_GROUP_TOTAL
1522            .with_label_values(&["vector_index_filtered"])
1523            .inc_by(self.rows_vector_filtered as u64);
1524    }
1525
1526    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1527        match index_type {
1528            INDEX_TYPE_FULLTEXT => {
1529                self.rg_fulltext_filtered += row_group_count;
1530                self.rows_fulltext_filtered += row_count;
1531            }
1532            INDEX_TYPE_INVERTED => {
1533                self.rg_inverted_filtered += row_group_count;
1534                self.rows_inverted_filtered += row_count;
1535            }
1536            INDEX_TYPE_BLOOM => {
1537                self.rg_bloom_filtered += row_group_count;
1538                self.rows_bloom_filtered += row_count;
1539            }
1540            INDEX_TYPE_VECTOR => {
1541                self.rg_vector_filtered += row_group_count;
1542                self.rows_vector_filtered += row_count;
1543            }
1544            _ => {}
1545        }
1546    }
1547}
1548
1549#[cfg(all(test, feature = "vector_index"))]
1550mod vector_index_tests {
1551    use super::*;
1552
1553    #[test]
1554    fn test_vector_selection_from_offsets() {
1555        let row_group_size = 4;
1556        let num_row_groups = 3;
1557        let selection =
1558            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1559                .unwrap();
1560
1561        assert_eq!(selection.row_group_count(), 3);
1562        assert_eq!(selection.row_count(), 4);
1563        assert!(selection.contains_non_empty_row_group(0));
1564        assert!(selection.contains_non_empty_row_group(1));
1565        assert!(selection.contains_non_empty_row_group(2));
1566    }
1567
1568    #[test]
1569    fn test_vector_selection_from_offsets_out_of_range() {
1570        let row_group_size = 4;
1571        let num_row_groups = 2;
1572        let selection = vector_selection_from_offsets(
1573            vec![0, 7, u64::from(u32::MAX) + 1],
1574            row_group_size,
1575            num_row_groups,
1576        );
1577        assert!(selection.is_err());
1578    }
1579
1580    #[test]
1581    fn test_vector_selection_updates_metrics() {
1582        let row_group_size = 4;
1583        let total_rows = 8;
1584        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1585        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1586        let mut metrics = ReaderFilterMetrics::default();
1587
1588        apply_selection_and_update_metrics(
1589            &mut output,
1590            &selection,
1591            &mut metrics,
1592            INDEX_TYPE_VECTOR,
1593        );
1594
1595        assert_eq!(metrics.rg_vector_filtered, 1);
1596        assert_eq!(metrics.rows_vector_filtered, 7);
1597        assert_eq!(output.row_count(), 1);
1598    }
1599}
1600
1601/// Metrics for parquet metadata cache operations.
1602#[derive(Default, Clone, Copy)]
1603pub struct MetadataCacheMetrics {
1604    /// Number of memory cache hits for parquet metadata.
1605    pub mem_cache_hit: usize,
1606    /// Number of file cache hits for parquet metadata.
1607    pub file_cache_hit: usize,
1608    /// Number of cache misses for parquet metadata.
1609    pub cache_miss: usize,
1610    /// Duration to load parquet metadata.
1611    pub metadata_load_cost: Duration,
1612    /// Number of read operations performed.
1613    pub num_reads: usize,
1614    /// Total bytes read from storage.
1615    pub bytes_read: u64,
1616}
1617
1618impl std::fmt::Debug for MetadataCacheMetrics {
1619    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1620        let Self {
1621            mem_cache_hit,
1622            file_cache_hit,
1623            cache_miss,
1624            metadata_load_cost,
1625            num_reads,
1626            bytes_read,
1627        } = self;
1628
1629        if self.is_empty() {
1630            return write!(f, "{{}}");
1631        }
1632        write!(f, "{{")?;
1633
1634        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1635
1636        if *mem_cache_hit > 0 {
1637            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1638        }
1639        if *file_cache_hit > 0 {
1640            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1641        }
1642        if *cache_miss > 0 {
1643            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1644        }
1645        if *num_reads > 0 {
1646            write!(f, ", \"num_reads\":{}", num_reads)?;
1647        }
1648        if *bytes_read > 0 {
1649            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1650        }
1651
1652        write!(f, "}}")
1653    }
1654}
1655
1656impl MetadataCacheMetrics {
1657    /// Returns true if the metrics are empty (contain no meaningful data).
1658    pub(crate) fn is_empty(&self) -> bool {
1659        self.metadata_load_cost.is_zero()
1660    }
1661
1662    /// Adds `other` metrics to this metrics.
1663    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1664        self.mem_cache_hit += other.mem_cache_hit;
1665        self.file_cache_hit += other.file_cache_hit;
1666        self.cache_miss += other.cache_miss;
1667        self.metadata_load_cost += other.metadata_load_cost;
1668        self.num_reads += other.num_reads;
1669        self.bytes_read += other.bytes_read;
1670    }
1671}
1672
1673/// Parquet reader metrics.
1674#[derive(Debug, Default, Clone)]
1675pub struct ReaderMetrics {
1676    /// Filtered row groups and rows metrics.
1677    pub(crate) filter_metrics: ReaderFilterMetrics,
1678    /// Duration to build the parquet reader.
1679    pub(crate) build_cost: Duration,
1680    /// Duration to scan the reader.
1681    pub(crate) scan_cost: Duration,
1682    /// Number of record batches read.
1683    pub(crate) num_record_batches: usize,
1684    /// Number of batches decoded.
1685    pub(crate) num_batches: usize,
1686    /// Number of rows read.
1687    pub(crate) num_rows: usize,
1688    /// Metrics for parquet metadata cache.
1689    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1690    /// Optional metrics for page/row group fetch operations.
1691    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1692    /// Memory size of metadata loaded for building file ranges.
1693    pub(crate) metadata_mem_size: isize,
1694    /// Number of file range builders created.
1695    pub(crate) num_range_builders: isize,
1696}
1697
1698impl ReaderMetrics {
1699    /// Adds `other` metrics to this metrics.
1700    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1701        self.filter_metrics.merge_from(&other.filter_metrics);
1702        self.build_cost += other.build_cost;
1703        self.scan_cost += other.scan_cost;
1704        self.num_record_batches += other.num_record_batches;
1705        self.num_batches += other.num_batches;
1706        self.num_rows += other.num_rows;
1707        self.metadata_cache_metrics
1708            .merge_from(&other.metadata_cache_metrics);
1709        if let Some(other_fetch) = &other.fetch_metrics {
1710            if let Some(self_fetch) = &self.fetch_metrics {
1711                self_fetch.merge_from(other_fetch);
1712            } else {
1713                self.fetch_metrics = Some(other_fetch.clone());
1714            }
1715        }
1716        self.metadata_mem_size += other.metadata_mem_size;
1717        self.num_range_builders += other.num_range_builders;
1718    }
1719
1720    /// Reports total rows.
1721    pub(crate) fn observe_rows(&self, read_type: &str) {
1722        READ_ROWS_TOTAL
1723            .with_label_values(&[read_type])
1724            .inc_by(self.num_rows as u64);
1725    }
1726}
1727
1728/// Builder to build a parquet record batch stream for a row group.
1729pub(crate) struct RowGroupReaderBuilder {
1730    /// SST file to read.
1731    ///
1732    /// Holds the file handle to avoid the file purge it.
1733    file_handle: FileHandle,
1734    /// Path of the file.
1735    file_path: String,
1736    /// Metadata of the parquet file.
1737    parquet_meta: Arc<ParquetMetaData>,
1738    /// Arrow reader metadata for building async stream.
1739    arrow_metadata: ArrowReaderMetadata,
1740    /// Projected output schema aligned with `projection.projected_root_presence`.
1741    output_schema: SchemaRef,
1742    /// Object store as an Operator.
1743    object_store: ObjectStore,
1744    /// Projection mask.
1745    projection: ProjectionMaskPlan,
1746    /// Whether projected read columns include nested paths.
1747    has_nested_projection: bool,
1748    /// Cache.
1749    cache_strategy: CacheStrategy,
1750    /// Pre-built prefilter state. `None` if prefiltering is not applicable.
1751    prefilter_builder: Option<PrefilterContextBuilder>,
1752}
1753
1754/// Context passed to [RowGroupReaderBuilder::build()] carrying all information
1755/// needed for prefiltering decisions.
1756pub(crate) struct RowGroupBuildContext<'a> {
1757    /// Index of the row group to read.
1758    pub(crate) row_group_idx: usize,
1759    /// Row selection for the row group. `None` means all rows.
1760    pub(crate) row_selection: Option<RowSelection>,
1761    /// Metrics for tracking fetch operations.
1762    pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1763}
1764
1765impl RowGroupReaderBuilder {
1766    /// Path of the file to read.
1767    pub(crate) fn file_path(&self) -> &str {
1768        &self.file_path
1769    }
1770
1771    /// Handle of the file to read.
1772    pub(crate) fn file_handle(&self) -> &FileHandle {
1773        &self.file_handle
1774    }
1775
1776    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1777        &self.parquet_meta
1778    }
1779
1780    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1781        &self.cache_strategy
1782    }
1783
1784    pub(crate) fn has_predicate_prefilter(&self) -> bool {
1785        self.prefilter_builder.is_some()
1786    }
1787
1788    /// Builds a parquet record batch stream to read the row group at `row_group_idx`.
1789    ///
1790    /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
1791    /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
1792    /// 2. Reads the full projection with the refined row selection
1793    ///
1794    /// The prefilter pass is *best-effort pruning*, not the precise filter for the query.
1795    /// Predicates that cannot be lowered to prefilter columns (column not projected,
1796    /// expression not supported, etc.) are silently skipped. Correctness rests on the
1797    /// DataFusion `FilterExec` above this reader, which always re-applies the original
1798    /// predicate. Tag and timestamp predicates that flow through [`SimpleFilterEvaluator`]
1799    /// are an exception — the engine enforces them precisely, so the prefilter pass is the
1800    /// only place they execute. See [`build_reader_filter_plan`] for the bucketing rules.
1801    ///
1802    /// When the prefilter result selects no rows, the second read still issues but
1803    /// parquet-rs short-circuits before any column-chunk IO: the row-group state machine
1804    /// jumps to `Finished` once it sees `num_rows_selected() == 0`, so no fast path is
1805    /// added here.
1806    pub(crate) async fn build(
1807        &self,
1808        build_ctx: RowGroupBuildContext<'_>,
1809    ) -> Result<ProjectedRecordBatchStream> {
1810        let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1811
1812        let Some(mut prefilter_ctx) = prefilter_ctx else {
1813            // No prefilter applicable, build stream with full projection.
1814            let stream = self
1815                .build_with_projection(
1816                    build_ctx.row_group_idx,
1817                    build_ctx.row_selection,
1818                    self.projection.mask.clone(),
1819                    build_ctx.fetch_metrics,
1820                )
1821                .await?;
1822            return self.make_projected_stream(stream);
1823        };
1824
1825        let prefilter_start = Instant::now();
1826        let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1827        if let Some(metrics) = build_ctx.fetch_metrics {
1828            let mut data = metrics.data.lock().unwrap();
1829            data.prefilter_cost += prefilter_start.elapsed();
1830            data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1831        }
1832
1833        let refined_selection = Some(prefilter_result.refined_selection);
1834
1835        let stream = self
1836            .build_with_projection(
1837                build_ctx.row_group_idx,
1838                refined_selection,
1839                self.projection.mask.clone(),
1840                build_ctx.fetch_metrics,
1841            )
1842            .await?;
1843        self.make_projected_stream(stream)
1844    }
1845
1846    fn make_projected_stream(
1847        &self,
1848        stream: ProjectedRecordBatchStream,
1849    ) -> Result<ProjectedRecordBatchStream> {
1850        if !self.has_nested_projection {
1851            return Ok(stream);
1852        }
1853
1854        Ok(NestedSchemaAligner::new(
1855            stream,
1856            self.projection.projected_root_presence.clone(),
1857            self.output_schema.clone(),
1858        )?
1859        .boxed())
1860    }
1861
1862    /// Builds a parquet record batch stream with a custom projection mask.
1863    pub(crate) async fn build_with_projection(
1864        &self,
1865        row_group_idx: usize,
1866        row_selection: Option<RowSelection>,
1867        projection: ProjectionMask,
1868        fetch_metrics: Option<&ParquetFetchMetrics>,
1869    ) -> Result<ProjectedRecordBatchStream> {
1870        let range_fetcher = SstParquetRangeFetcher::new(
1871            self.file_handle.file_id(),
1872            self.file_path.clone(),
1873            self.object_store.clone(),
1874            self.cache_strategy.clone(),
1875            row_group_idx,
1876            fetch_metrics.cloned(),
1877        );
1878
1879        build_sst_parquet_record_batch_stream(
1880            self.arrow_metadata.clone(),
1881            row_group_idx,
1882            row_selection,
1883            projection,
1884            range_fetcher,
1885            self.file_path.clone(),
1886            DEFAULT_READ_BATCH_SIZE,
1887        )
1888    }
1889}
1890
1891#[derive(Clone)]
1892/// The filter to evaluate or the prune result of the default value.
1893pub(crate) enum MaybeFilter {
1894    /// The filter to evaluate.
1895    Filter(SimpleFilterEvaluator),
1896    /// The filter matches the default value.
1897    Matched,
1898    /// The filter is pruned.
1899    Pruned,
1900}
1901
1902impl MaybeFilter {
1903    /// Returns the inner filter when it is available.
1904    pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
1905        match self {
1906            MaybeFilter::Filter(filter) => Some(filter),
1907            MaybeFilter::Matched | MaybeFilter::Pruned => None,
1908        }
1909    }
1910}
1911
1912#[derive(Clone)]
1913/// Context to evaluate the column filter for a parquet file.
1914pub(crate) struct SimpleFilterContext {
1915    /// Filter to evaluate.
1916    filter: MaybeFilter,
1917    /// Debug string of the original logical expression.
1918    expr_str: String,
1919    /// Id of the column to evaluate.
1920    column_id: ColumnId,
1921    /// Semantic type of the column.
1922    semantic_type: SemanticType,
1923}
1924
1925impl SimpleFilterContext {
1926    /// Creates a context for the `expr`.
1927    ///
1928    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1929    /// expected metadata.
1930    pub(crate) fn new_opt(
1931        sst_meta: &RegionMetadataRef,
1932        expected_meta: Option<&RegionMetadata>,
1933        expr: &Expr,
1934    ) -> Option<Self> {
1935        let filter = SimpleFilterEvaluator::try_new(expr)?;
1936        let expr_str = format!("{expr:?}");
1937        let (column_metadata, maybe_filter) = match expected_meta {
1938            Some(meta) => {
1939                // Gets the column metadata from the expected metadata.
1940                let column = meta.column_by_name(filter.column_name())?;
1941                // Checks if the column is present in the SST metadata. We still uses the
1942                // column from the expected metadata.
1943                match sst_meta.column_by_id(column.column_id) {
1944                    Some(sst_column) => {
1945                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1946                        // Schema evolution can make field columns with the same id have
1947                        // different concrete data types across SSTs. In that case,
1948                        // evaluating this simple filter against current SST column may
1949                        // raise an invalid cross-type comparison error (e.g. Float64 == Utf8).
1950                        let maybe_filter = if sst_column.column_schema.data_type
1951                            == column.column_schema.data_type
1952                        {
1953                            MaybeFilter::Filter(filter)
1954                        } else {
1955                            // Altering tag or timestamp column types is not allowed,
1956                            // so only field columns can reach this branch.
1957                            debug_assert_eq!(column.semantic_type, SemanticType::Field);
1958                            return None;
1959                        };
1960                        (column, maybe_filter)
1961                    }
1962                    None => {
1963                        // If the column is not present in the SST metadata, we evaluate the filter
1964                        // against the default value of the column.
1965                        // If we can't evaluate the filter, we return None.
1966                        if pruned_by_default(&filter, column)? {
1967                            (column, MaybeFilter::Pruned)
1968                        } else {
1969                            (column, MaybeFilter::Matched)
1970                        }
1971                    }
1972                }
1973            }
1974            None => {
1975                let column = sst_meta.column_by_name(filter.column_name())?;
1976                (column, MaybeFilter::Filter(filter))
1977            }
1978        };
1979
1980        Some(Self {
1981            filter: maybe_filter,
1982            expr_str,
1983            column_id: column_metadata.column_id,
1984            semantic_type: column_metadata.semantic_type,
1985        })
1986    }
1987
1988    /// Returns the filter to evaluate.
1989    pub(crate) fn filter(&self) -> &MaybeFilter {
1990        &self.filter
1991    }
1992
1993    /// Returns the original logical expression string.
1994    pub(crate) fn expr_str(&self) -> &str {
1995        &self.expr_str
1996    }
1997
1998    /// Returns the column id.
1999    pub(crate) fn column_id(&self) -> ColumnId {
2000        self.column_id
2001    }
2002
2003    /// Returns the semantic type of the column.
2004    pub(crate) fn semantic_type(&self) -> SemanticType {
2005        self.semantic_type
2006    }
2007}
2008
2009/// Context to evaluate a physical expression for a parquet file.
2010#[derive(Clone)]
2011pub(crate) struct PhysicalFilterContext {
2012    /// Filter to evaluate.
2013    filter: Arc<dyn PhysicalExpr>,
2014    /// Debug string of the original logical expression.
2015    expr_str: String,
2016    /// Id of the column to evaluate.
2017    column_id: ColumnId,
2018    /// Name of the column to evaluate.
2019    column_name: String,
2020    /// Semantic type of the column.
2021    semantic_type: SemanticType,
2022    /// Schema containing only the referenced column.
2023    schema: SchemaRef,
2024    /// Whether the original logical expression is immutable across queries.
2025    immutable: bool,
2026}
2027
2028impl PhysicalFilterContext {
2029    /// Creates a context for the `expr`.
2030    ///
2031    /// Returns None if the expression doesn't reference exactly one column or the
2032    /// column to filter doesn't exist in the SST metadata or the expected metadata.
2033    pub(crate) fn new_opt(
2034        sst_meta: &RegionMetadataRef,
2035        expected_meta: Option<&RegionMetadata>,
2036        read_format: &FlatReadFormat,
2037        expr: &Expr,
2038    ) -> Option<Self> {
2039        if !Self::is_prefilter_candidate(expr) {
2040            return None;
2041        }
2042        let expr_str = format!("{expr:?}");
2043        let column_name = Self::single_column_name(expr)?;
2044        let column_metadata = match expected_meta {
2045            Some(meta) => {
2046                let column = meta.column_by_name(&column_name)?;
2047                let sst_column = sst_meta.column_by_id(column.column_id)?;
2048                // Physical expr requires the column name to match the SST column name.
2049                if sst_column.column_schema.name != column_name {
2050                    return None;
2051                }
2052                column
2053            }
2054            None => sst_meta.column_by_name(&column_name)?,
2055        };
2056
2057        // The column must be present in the projected arrow schema for the
2058        // prefilter to be able to read it.
2059        let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
2060        let field = field.clone();
2061        let schema = Arc::new(ArrowSchema::new(vec![field]));
2062        let physical_expr = Predicate::to_physical_expr(expr, &schema)
2063            .inspect_err(|e| {
2064                error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
2065            })
2066            .ok()?;
2067        let immutable = expr_is_immutable(expr);
2068
2069        Some(Self {
2070            filter: physical_expr,
2071            expr_str,
2072            column_id: column_metadata.column_id,
2073            column_name,
2074            semantic_type: column_metadata.semantic_type,
2075            schema,
2076            immutable,
2077        })
2078    }
2079
2080    /// Returns true if the expression is a variant we want to evaluate as a
2081    /// physical prefilter. Binary exprs are intentionally excluded because
2082    /// [`SimpleFilterEvaluator`] already handles them.
2083    // TODO(yingwen): extend more expressions if necessary. For example, allow some cheap scalar functions (e.g. `lower`, `length`, date truncations)
2084    fn is_prefilter_candidate(expr: &Expr) -> bool {
2085        if !matches!(
2086            expr,
2087            Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
2088        ) {
2089            return false;
2090        }
2091
2092        // If any functions are found in the expr, it will be not considered as worthy enough to
2093        // be evaluated in the prefilter. At last, prefilter reads the Parquet files one more time.
2094        !expr
2095            .exists(|e| Ok(matches!(e, Expr::ScalarFunction(_))))
2096            .unwrap_or(false)
2097    }
2098
2099    fn single_column_name(expr: &Expr) -> Option<String> {
2100        let mut columns = HashSet::new();
2101        if expr_to_columns(expr, &mut columns).is_err() {
2102            return None;
2103        }
2104        if columns.len() != 1 {
2105            return None;
2106        }
2107        columns.iter().next().map(|column| column.name.clone())
2108    }
2109
2110    /// Returns the filter to evaluate.
2111    pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
2112        &self.filter
2113    }
2114
2115    /// Returns the original logical expression string.
2116    pub(crate) fn expr_str(&self) -> &str {
2117        &self.expr_str
2118    }
2119
2120    /// Returns the column id.
2121    pub(crate) fn column_id(&self) -> ColumnId {
2122        self.column_id
2123    }
2124
2125    /// Returns the column name.
2126    pub(crate) fn column_name(&self) -> &str {
2127        &self.column_name
2128    }
2129
2130    /// Returns the semantic type of the column.
2131    pub(crate) fn semantic_type(&self) -> SemanticType {
2132        self.semantic_type
2133    }
2134
2135    /// Returns the schema containing only the referenced column.
2136    pub(crate) fn schema(&self) -> &SchemaRef {
2137        &self.schema
2138    }
2139
2140    /// Returns true if the original logical expression is immutable across queries.
2141    pub(crate) fn is_immutable(&self) -> bool {
2142        self.immutable
2143    }
2144}
2145
2146fn expr_is_immutable(expr: &Expr) -> bool {
2147    let mut is_immutable = true;
2148    let _ = expr.apply(|expr| match expr {
2149        Expr::ScalarFunction(function)
2150            if function.func.signature().volatility != Volatility::Immutable =>
2151        {
2152            is_immutable = false;
2153            Ok(TreeNodeRecursion::Stop)
2154        }
2155        Expr::ScalarVariable(_, _) => {
2156            is_immutable = false;
2157            Ok(TreeNodeRecursion::Stop)
2158        }
2159        _ => Ok(TreeNodeRecursion::Continue),
2160    });
2161    is_immutable
2162}
2163
2164/// Prune a column by its default value.
2165/// Returns false if we can't create the default value or evaluate the filter.
2166fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
2167    let value = column.column_schema.create_default().ok().flatten()?;
2168    let scalar_value = value
2169        .try_to_scalar_value(&column.column_schema.data_type)
2170        .ok()?;
2171    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
2172    Some(!matches)
2173}
2174
2175/// Parquet batch reader to read our SST format.
2176pub struct ParquetReader {
2177    /// File range context.
2178    context: FileRangeContextRef,
2179    /// Row group selection to read.
2180    selection: RowGroupSelection,
2181    /// Reader of current row group.
2182    reader: Option<FlatPruneReader>,
2183    /// Metrics for tracking row group fetch operations.
2184    fetch_metrics: ParquetFetchMetrics,
2185}
2186
2187impl ParquetReader {
2188    #[tracing::instrument(
2189        skip_all,
2190        fields(
2191            region_id = %self.context.reader_builder().file_handle.region_id(),
2192            file_id = %self.context.reader_builder().file_handle.file_id()
2193        )
2194    )]
2195    pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2196        loop {
2197            if let Some(reader) = &mut self.reader {
2198                if let Some(batch) = reader.next_batch().await? {
2199                    return Ok(Some(batch));
2200                }
2201                self.reader = None;
2202                continue;
2203            }
2204
2205            let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
2206                return Ok(None);
2207            };
2208
2209            let skip_fields = self.context.pre_filter_mode().skip_fields();
2210            let parquet_reader = self
2211                .context
2212                .reader_builder()
2213                .build(self.context.build_context(
2214                    row_group_idx,
2215                    Some(row_selection),
2216                    Some(&self.fetch_metrics),
2217                ))
2218                .await?;
2219            self.reader = Some(FlatPruneReader::new_with_row_group_reader(
2220                self.context.clone(),
2221                FlatRowGroupReader::new(self.context.clone(), parquet_reader),
2222                skip_fields,
2223            ));
2224        }
2225    }
2226    /// Creates a new reader.
2227    #[tracing::instrument(
2228        skip_all,
2229        fields(
2230            region_id = %context.reader_builder().file_handle.region_id(),
2231            file_id = %context.reader_builder().file_handle.file_id()
2232        )
2233    )]
2234    pub(crate) async fn new(
2235        context: FileRangeContextRef,
2236        mut selection: RowGroupSelection,
2237    ) -> Result<Self> {
2238        let fetch_metrics = ParquetFetchMetrics::default();
2239        let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
2240            let skip_fields = context.pre_filter_mode().skip_fields();
2241            let parquet_reader = context
2242                .reader_builder()
2243                .build(context.build_context(
2244                    row_group_idx,
2245                    Some(row_selection),
2246                    Some(&fetch_metrics),
2247                ))
2248                .await?;
2249            Some(FlatPruneReader::new_with_row_group_reader(
2250                context.clone(),
2251                FlatRowGroupReader::new(context.clone(), parquet_reader),
2252                skip_fields,
2253            ))
2254        } else {
2255            None
2256        };
2257
2258        Ok(ParquetReader {
2259            context,
2260            selection,
2261            reader,
2262            fetch_metrics,
2263        })
2264    }
2265
2266    /// Returns the metadata of the SST.
2267    pub fn metadata(&self) -> &RegionMetadataRef {
2268        self.context.read_format().metadata()
2269    }
2270
2271    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2272        self.context.reader_builder().parquet_meta.clone()
2273    }
2274}
2275
2276/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2277pub(crate) struct FlatRowGroupReader {
2278    /// Context for file ranges.
2279    context: FileRangeContextRef,
2280    /// Inner parquet record batch stream.
2281    stream: ProjectedRecordBatchStream,
2282    /// Cached sequence array to override sequences.
2283    override_sequence: Option<ArrayRef>,
2284}
2285
2286impl FlatRowGroupReader {
2287    /// Creates a new flat reader from file range.
2288    pub(crate) fn new(context: FileRangeContextRef, stream: ProjectedRecordBatchStream) -> Self {
2289        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2290        let override_sequence = context
2291            .read_format()
2292            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2293
2294        Self {
2295            context,
2296            stream,
2297            override_sequence,
2298        }
2299    }
2300
2301    /// Returns the next RecordBatch.
2302    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2303        match self.stream.next().await {
2304            Some(batch_result) => {
2305                let record_batch = batch_result?;
2306
2307                let record_batch = self
2308                    .context
2309                    .read_format()
2310                    .convert_batch(record_batch, self.override_sequence.as_ref())?;
2311                Ok(Some(record_batch))
2312            }
2313            None => Ok(None),
2314        }
2315    }
2316}
2317
2318#[cfg(test)]
2319mod tests {
2320    use std::any::Any;
2321    use std::fmt::{Debug, Formatter};
2322    use std::sync::{Arc, LazyLock};
2323
2324    use common_error::ext::WhateverResult;
2325    use common_function::scalars::json::json_get::JsonGetWithType;
2326    use common_function::scalars::udf::create_udf;
2327    use common_recordbatch::ext::RecordBatchExt;
2328    use datafusion::arrow::datatypes::DataType;
2329    use datafusion_common::ScalarValue;
2330    use datafusion_expr::expr::ScalarFunction;
2331    use datafusion_expr::{
2332        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2333        col, lit,
2334    };
2335    use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, StructArray};
2336    use datatypes::arrow::datatypes::{Fields, Schema};
2337    use datatypes::arrow::record_batch::RecordBatch;
2338    use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
2339    use datatypes::prelude::ConcreteDataType;
2340    use datatypes::schema::ColumnSchema;
2341    use object_store::services::Memory;
2342    use parquet::arrow::ArrowWriter;
2343    use parquet::file::properties::WriterProperties;
2344    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2345    use store_api::region_request::PathType;
2346    use store_api::storage::RegionId;
2347    use table::predicate::Predicate;
2348
2349    use super::*;
2350    use crate::sst::parquet::metadata::MetadataLoader;
2351    use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
2352    use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2353
2354    #[test]
2355    fn test_skip_prefilter_for_json_get() -> WhateverResult<()> {
2356        fn json_get_expr(base: Expr, path: &str) -> Expr {
2357            let json_get = Arc::new(create_udf(Arc::new(JsonGetWithType::default())));
2358            Expr::ScalarFunction(ScalarFunction::new_udf(json_get, vec![base, lit(path)]))
2359        }
2360
2361        let metadata = Arc::new(sst_region_metadata());
2362        let format = FlatReadFormat::new(
2363            metadata.clone(),
2364            ReadColumns::from_deduped_column_ids(
2365                metadata.column_metadatas.iter().map(|c| c.column_id),
2366            ),
2367            None,
2368            "test",
2369            true,
2370        )?;
2371        let new_filter =
2372            |expr: Expr| PhysicalFilterContext::new_opt(&metadata, None, &format, &expr);
2373
2374        let json_get = || json_get_expr(col("field_0"), "a.b");
2375
2376        let regular_expr = col("field_0").is_null();
2377        assert!(new_filter(regular_expr).is_some());
2378
2379        let is_null = json_get().is_null();
2380        assert!(new_filter(is_null).is_none());
2381
2382        let is_not_null = json_get().is_not_null();
2383        assert!(new_filter(is_not_null).is_none());
2384
2385        let in_list = json_get().in_list(vec![lit("value")], false);
2386        assert!(new_filter(in_list).is_none());
2387
2388        let in_list_nested = col("field_0").in_list(vec![json_get()], false);
2389        assert!(new_filter(in_list_nested).is_none());
2390
2391        let between = json_get().between(lit(1_u64), lit(10_u64));
2392        assert!(new_filter(between).is_none());
2393
2394        let between_nested = col("field_0").between(json_get(), lit(10_u64));
2395        assert!(new_filter(between_nested).is_none());
2396
2397        Ok(())
2398    }
2399
2400    #[tokio::test]
2401    async fn test_nested_projection_reads_partial_json2_physical_fields() -> WhateverResult<()> {
2402        // Write a full JSON2-like Arrow struct:
2403        // j: { a: { x: int, y: string }, b: string }.
2404        // The test later requests only j.a.x and verifies that the physical Parquet projection
2405        // does not materialize j.a.y or j.b.
2406
2407        let xy_fields = Fields::from(vec![
2408            Arc::new(Field::new("x", DataType::Int64, true)),
2409            Arc::new(Field::new("y", DataType::Utf8, true)),
2410        ]);
2411        let a_field = Arc::new(Field::new("a", DataType::Struct(xy_fields.clone()), true));
2412        let b_field = Arc::new(Field::new("b", DataType::Utf8, true));
2413        let json_fields = Fields::from(vec![a_field, b_field]);
2414        let json_field = Field::new("j", DataType::Struct(json_fields.clone()), true)
2415            .with_extension_type(JsonExtensionType::new(Arc::new(JsonMetadata::default())));
2416        let schema = Arc::new(Schema::new(vec![json_field]));
2417
2418        let a_array = Arc::new(StructArray::new(
2419            xy_fields,
2420            vec![
2421                Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef,
2422                Arc::new(StringArray::from_iter_values(["x1", "x2", "x3"])) as ArrayRef,
2423            ],
2424            None,
2425        )) as ArrayRef;
2426        let b_array = Arc::new(StringArray::from_iter_values(["b1", "b2", "b3"])) as ArrayRef;
2427        let j_array =
2428            Arc::new(StructArray::new(json_fields, vec![a_array, b_array], None)) as ArrayRef;
2429        let columns = vec![j_array];
2430
2431        let batch = RecordBatch::try_new(schema, columns).map_err(|e| e.to_string())?;
2432
2433        // Persist the complete nested schema to an in-memory Parquet file so the projection is
2434        // exercised through parquet-rs rather than a mock.
2435
2436        let object_store = ObjectStore::new(Memory::default())
2437            .map_err(|e| e.to_string())?
2438            .finish();
2439        let file_handle = sst_file_handle(0, 1);
2440        let file_path = file_handle.file_path("test_table", PathType::Bare);
2441
2442        let mut parquet_bytes = Vec::new();
2443        ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None)
2444            .and_then(|mut w| {
2445                w.write(&batch)?;
2446                Ok(w)
2447            })
2448            .and_then(|w| w.close())
2449            .map_err(|e| e.to_string())?;
2450        let file_size = parquet_bytes.len() as u64;
2451        object_store
2452            .write(&file_path, parquet_bytes)
2453            .await
2454            .map_err(|e| e.to_string())?;
2455
2456        let mut cache_metrics = MetadataCacheMetrics::default();
2457        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2458        let parquet_meta = loader.load(&mut cache_metrics).await?;
2459        let parquet_schema = parquet_meta.file_metadata().schema_descr();
2460        assert_eq!(3, parquet_schema.num_columns());
2461
2462        // Ask Parquet to read only the deepest requested JSON2 path. This should select the single
2463        // leaf j.a.x and avoid both sibling leaves j.a.y and j.b.
2464
2465        let projection =
2466            ParquetReadColumns::from_deduped(vec![ParquetReadColumn::new(0).with_nested_paths(
2467                vec![vec!["j".to_string(), "a".to_string(), "x".to_string()]],
2468            )]);
2469        let projection_plan = build_projection_plan(&projection, parquet_schema);
2470        assert_eq!(vec![true], projection_plan.projected_root_presence);
2471        assert_eq!(
2472            projection_plan.mask,
2473            ProjectionMask::leaves(parquet_schema, vec![0])
2474        );
2475
2476        // Read through the low-level stream directly.
2477
2478        let arrow_metadata =
2479            ArrowReaderMetadata::try_new(Arc::new(parquet_meta), ArrowReaderOptions::new())
2480                .map_err(|e| e.to_string())?;
2481        let fetcher = SstParquetRangeFetcher::new(
2482            file_handle.file_id(),
2483            file_path.clone(),
2484            object_store,
2485            CacheStrategy::Disabled,
2486            0,
2487            None,
2488        );
2489        let mut stream = build_sst_parquet_record_batch_stream(
2490            arrow_metadata,
2491            0,
2492            None,
2493            projection_plan.mask,
2494            fetcher,
2495            file_path,
2496            1024,
2497        )?;
2498
2499        let Some(batch) = stream.next().await.transpose()? else {
2500            unreachable!()
2501        };
2502        let expected = r#"
2503+-------------+
2504| j           |
2505+-------------+
2506| {a: {x: 1}} |
2507| {a: {x: 2}} |
2508| {a: {x: 3}} |
2509+-------------+
2510"#;
2511        assert_eq!(batch.pretty_print(), expected.trim());
2512        Ok(())
2513    }
2514
2515    #[tokio::test(flavor = "current_thread")]
2516    async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2517        #[derive(Eq, PartialEq, Hash)]
2518        struct PanicDebugUdf;
2519
2520        impl Debug for PanicDebugUdf {
2521            fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2522                panic!("minmax predicate key should not format exprs when cache is disabled");
2523            }
2524        }
2525
2526        impl ScalarUDFImpl for PanicDebugUdf {
2527            fn as_any(&self) -> &dyn Any {
2528                self
2529            }
2530
2531            fn name(&self) -> &str {
2532                "panic_debug_udf"
2533            }
2534
2535            fn signature(&self) -> &Signature {
2536                static SIGNATURE: LazyLock<Signature> =
2537                    LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2538                &SIGNATURE
2539            }
2540
2541            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2542                Ok(DataType::Int64)
2543            }
2544
2545            fn invoke_with_args(
2546                &self,
2547                _args: ScalarFunctionArgs,
2548            ) -> datafusion_common::Result<ColumnarValue> {
2549                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2550            }
2551        }
2552
2553        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2554        let file_handle = sst_file_handle(0, 1);
2555        let table_dir = "test_table".to_string();
2556        let path_type = PathType::Bare;
2557        let file_path = file_handle.file_path(&table_dir, path_type);
2558
2559        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2560        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2561        let mut parquet_bytes = Vec::new();
2562        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2563        writer.write(&batch).unwrap();
2564        writer.close().unwrap();
2565        let file_size = parquet_bytes.len() as u64;
2566        object_store.write(&file_path, parquet_bytes).await.unwrap();
2567
2568        let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2569        let read_format = FlatReadFormat::new(
2570            region_metadata.clone(),
2571            ReadColumns::from_deduped_column_ids(
2572                region_metadata
2573                    .column_metadatas
2574                    .iter()
2575                    .map(|column| column.column_id),
2576            ),
2577            None,
2578            &file_path,
2579            false,
2580        )
2581        .unwrap();
2582
2583        let mut cache_metrics = MetadataCacheMetrics::default();
2584        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2585        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2586
2587        let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2588        let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2589            udf,
2590            vec![],
2591        ))]);
2592        let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2593            .predicate(Some(predicate))
2594            .cache(CacheStrategy::Disabled);
2595
2596        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2597        let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2598        let mut metrics = ReaderFilterMetrics::default();
2599        let selection = builder.row_groups_by_minmax(
2600            &read_format,
2601            &parquet_meta,
2602            row_group_size,
2603            total_row_count,
2604            &mut metrics,
2605            false,
2606        );
2607
2608        assert!(!selection.is_empty());
2609    }
2610
2611    #[test]
2612    fn test_expr_is_immutable_checks_scalar_function_volatility() {
2613        #[derive(Debug, PartialEq, Eq, Hash)]
2614        struct TestVolatilityUdf {
2615            name: String,
2616            signature: Signature,
2617        }
2618
2619        impl TestVolatilityUdf {
2620            fn new(name: &str, volatility: Volatility) -> Self {
2621                Self {
2622                    name: name.to_string(),
2623                    signature: Signature::variadic_any(volatility),
2624                }
2625            }
2626        }
2627
2628        impl ScalarUDFImpl for TestVolatilityUdf {
2629            fn as_any(&self) -> &dyn Any {
2630                self
2631            }
2632
2633            fn name(&self) -> &str {
2634                &self.name
2635            }
2636
2637            fn signature(&self) -> &Signature {
2638                &self.signature
2639            }
2640
2641            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2642                Ok(DataType::Int64)
2643            }
2644
2645            fn invoke_with_args(
2646                &self,
2647                _args: ScalarFunctionArgs,
2648            ) -> datafusion_common::Result<ColumnarValue> {
2649                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2650            }
2651        }
2652
2653        let expr = |name: &str, volatility| {
2654            Expr::ScalarFunction(ScalarFunction::new_udf(
2655                Arc::new(ScalarUDF::new_from_impl(TestVolatilityUdf::new(
2656                    name, volatility,
2657                ))),
2658                vec![],
2659            ))
2660        };
2661
2662        assert!(expr_is_immutable(&expr(
2663            "immutable_udf",
2664            Volatility::Immutable
2665        )));
2666        assert!(!expr_is_immutable(&expr("stable_udf", Volatility::Stable)));
2667        assert!(!expr_is_immutable(&expr(
2668            "volatile_udf",
2669            Volatility::Volatile
2670        )));
2671
2672        let scalar_variable = Expr::ScalarVariable(
2673            Arc::new(Field::new("@@version", DataType::Utf8, false)),
2674            vec!["@@version".to_string()],
2675        );
2676        assert!(!expr_is_immutable(&scalar_variable));
2677    }
2678
2679    #[tokio::test(flavor = "current_thread")]
2680    async fn test_has_row_level_selection() {
2681        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2682        let file_path = "row_level_selection.parquet";
2683
2684        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef;
2685        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2686        let props = WriterProperties::builder()
2687            .set_max_row_group_row_count(Some(3))
2688            .build();
2689        let mut parquet_bytes = Vec::new();
2690        let mut writer =
2691            ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
2692        writer.write(&batch).unwrap();
2693        writer.close().unwrap();
2694        let file_size = parquet_bytes.len() as u64;
2695        object_store.write(file_path, parquet_bytes).await.unwrap();
2696
2697        let mut cache_metrics = MetadataCacheMetrics::default();
2698        let loader = MetadataLoader::new(object_store, file_path, file_size);
2699        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2700        assert_eq!(2, parquet_meta.num_row_groups());
2701
2702        let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5);
2703        assert!(!has_row_level_selection(&full_row_groups, &parquet_meta));
2704
2705        let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3);
2706        assert!(has_row_level_selection(&prefix_selection, &parquet_meta));
2707
2708        let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3);
2709        assert!(has_row_level_selection(&interior_selection, &parquet_meta));
2710    }
2711
2712    fn expected_metadata_with_reused_tag_name(
2713        old_metadata: &RegionMetadata,
2714    ) -> Arc<RegionMetadata> {
2715        let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2716        builder
2717            .push_column_metadata(ColumnMetadata {
2718                column_schema: ColumnSchema::new(
2719                    "tag_0".to_string(),
2720                    ConcreteDataType::string_datatype(),
2721                    true,
2722                ),
2723                semantic_type: SemanticType::Tag,
2724                column_id: 10,
2725            })
2726            .push_column_metadata(ColumnMetadata {
2727                column_schema: ColumnSchema::new(
2728                    "tag_1".to_string(),
2729                    ConcreteDataType::string_datatype(),
2730                    true,
2731                ),
2732                semantic_type: SemanticType::Tag,
2733                column_id: 1,
2734            })
2735            .push_column_metadata(ColumnMetadata {
2736                column_schema: ColumnSchema::new(
2737                    "field_0".to_string(),
2738                    ConcreteDataType::uint64_datatype(),
2739                    true,
2740                ),
2741                semantic_type: SemanticType::Field,
2742                column_id: 2,
2743            })
2744            .push_column_metadata(ColumnMetadata {
2745                column_schema: ColumnSchema::new(
2746                    "ts".to_string(),
2747                    ConcreteDataType::timestamp_millisecond_datatype(),
2748                    false,
2749                ),
2750                semantic_type: SemanticType::Timestamp,
2751                column_id: 3,
2752            })
2753            .primary_key(vec![10, 1]);
2754
2755        Arc::new(builder.build().unwrap())
2756    }
2757
2758    #[test]
2759    fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
2760        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2761        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2762        let ctx = SimpleFilterContext::new_opt(
2763            &metadata,
2764            Some(expected_metadata.as_ref()),
2765            &col("tag_0").eq(lit("a")),
2766        )
2767        .unwrap();
2768        assert!(matches!(
2769            ctx.filter(),
2770            MaybeFilter::Matched | MaybeFilter::Pruned
2771        ));
2772    }
2773
2774    #[test]
2775    fn test_simple_filter_context_drops_mismatched_field_filter() {
2776        let (sst_metadata, latest_metadata) = mock_metadata();
2777        let ctx = SimpleFilterContext::new_opt(
2778            &sst_metadata,
2779            Some(latest_metadata.as_ref()),
2780            &col("field_0").eq(lit(1_i64)),
2781        );
2782
2783        assert!(ctx.is_none());
2784    }
2785
2786    fn mock_metadata() -> (RegionMetadataRef, RegionMetadataRef) {
2787        let region_id = RegionId::new(1, 1);
2788        let make_tag_0 = || ColumnMetadata {
2789            column_schema: ColumnSchema::new(
2790                "tag_0".to_string(),
2791                ConcreteDataType::string_datatype(),
2792                true,
2793            ),
2794            semantic_type: SemanticType::Tag,
2795            column_id: 0,
2796        };
2797        let make_ts = || ColumnMetadata {
2798            column_schema: ColumnSchema::new(
2799                "ts".to_string(),
2800                ConcreteDataType::timestamp_millisecond_datatype(),
2801                false,
2802            ),
2803            semantic_type: SemanticType::Timestamp,
2804            column_id: 2,
2805        };
2806        let make_field_0 = |data_type| ColumnMetadata {
2807            column_schema: ColumnSchema::new("field_0".to_string(), data_type, true),
2808            semantic_type: SemanticType::Field,
2809            column_id: 1,
2810        };
2811
2812        let mut sst_builder = RegionMetadataBuilder::new(region_id);
2813        sst_builder
2814            .push_column_metadata(make_tag_0())
2815            .push_column_metadata(make_field_0(ConcreteDataType::uint64_datatype()))
2816            .push_column_metadata(make_ts())
2817            .primary_key(vec![0]);
2818        let sst_metadata = Arc::new(sst_builder.build().unwrap());
2819
2820        let mut expected_builder = RegionMetadataBuilder::new(region_id);
2821        expected_builder
2822            .push_column_metadata(make_tag_0())
2823            .push_column_metadata(make_field_0(ConcreteDataType::int64_datatype()))
2824            .push_column_metadata(make_ts())
2825            .primary_key(vec![0]);
2826
2827        let expected_metadata = Arc::new(expected_builder.build().unwrap());
2828
2829        (sst_metadata, expected_metadata)
2830    }
2831
2832    #[test]
2833    fn test_physical_filter_context_skips_renamed_column() {
2834        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2835        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2836        let read_format = FlatReadFormat::new(
2837            metadata.clone(),
2838            ReadColumns::from_deduped_column_ids(
2839                metadata.column_metadatas.iter().map(|c| c.column_id),
2840            ),
2841            None,
2842            "test",
2843            true,
2844        )
2845        .unwrap();
2846
2847        let ctx = PhysicalFilterContext::new_opt(
2848            &metadata,
2849            Some(expected_metadata.as_ref()),
2850            &read_format,
2851            &col("tag_0").in_list(vec![lit("a"), lit("b")], false),
2852        );
2853
2854        assert!(ctx.is_none());
2855    }
2856
2857    #[test]
2858    fn test_physical_filter_context_only_accepts_prefilter_candidates() {
2859        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2860        let read_format = FlatReadFormat::new(
2861            metadata.clone(),
2862            ReadColumns::from_deduped_column_ids(
2863                metadata.column_metadatas.iter().map(|c| c.column_id),
2864            ),
2865            None,
2866            "test",
2867            true,
2868        )
2869        .unwrap();
2870
2871        // InList is on the allowlist — should build a context.
2872        let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
2873        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
2874
2875        // NOT IN uses the same variant with `negated: true` — also accepted.
2876        let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
2877        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &not_in).is_some());
2878
2879        // IS NULL / IS NOT NULL are accepted.
2880        let is_null = col("tag_0").is_null();
2881        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
2882        let is_not_null = col("tag_0").is_not_null();
2883        assert!(
2884            PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
2885        );
2886
2887        // BETWEEN is accepted.
2888        let between = col("field_0").between(lit(1_u64), lit(10_u64));
2889        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
2890
2891        // Binary expr is handled by SimpleFilterEvaluator — rejected here.
2892        let binary = col("tag_0").eq(lit("a"));
2893        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
2894    }
2895
2896    fn write_test_parquet_with_pk_column(values: &[&[u8]]) -> bytes::Bytes {
2897        use datatypes::arrow::array::{
2898            BinaryArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
2899        };
2900        use datatypes::arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema, TimeUnit};
2901        use store_api::storage::consts::{
2902            OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
2903        };
2904
2905        let n = values.len();
2906        let schema = Arc::new(ArrowSchema::new(vec![
2907            ArrowField::new(
2908                "ts",
2909                DataType::Timestamp(TimeUnit::Millisecond, None),
2910                false,
2911            ),
2912            ArrowField::new(PRIMARY_KEY_COLUMN_NAME, DataType::Binary, false),
2913            ArrowField::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
2914            ArrowField::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
2915        ]));
2916        let ts: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![0_i64; n]));
2917        let pk: ArrayRef = Arc::new(BinaryArray::from_iter_values(values.iter().copied()));
2918        let seq: ArrayRef = Arc::new(UInt64Array::from(vec![0_u64; n]));
2919        let op: ArrayRef = Arc::new(UInt8Array::from(vec![0_u8; n]));
2920        let batch = RecordBatch::try_new(schema.clone(), vec![ts, pk, seq, op]).unwrap();
2921
2922        let mut bytes = Vec::new();
2923        let mut writer = ArrowWriter::try_new(&mut bytes, schema, None).unwrap();
2924        writer.write(&batch).unwrap();
2925        writer.close().unwrap();
2926        bytes::Bytes::from(bytes)
2927    }
2928
2929    fn load_parquet_meta(bytes: bytes::Bytes) -> Arc<ParquetMetaData> {
2930        let builder =
2931            parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
2932        builder.metadata().clone()
2933    }
2934
2935    #[test]
2936    fn test_should_read_pk_as_binary_small_chunk_returns_false() {
2937        let bytes = write_test_parquet_with_pk_column(&[b"a", b"b", b"c"]);
2938        let meta = load_parquet_meta(bytes);
2939
2940        assert!(!should_read_pk_as_binary_with_limit(&meta, 1024));
2941    }
2942
2943    #[test]
2944    fn test_should_read_pk_as_binary_large_chunk_returns_true() {
2945        let owned: Vec<Vec<u8>> = (0..512u32)
2946            .map(|i| {
2947                let mut v = vec![0u8; 16];
2948                v[..4].copy_from_slice(&i.to_le_bytes());
2949                v
2950            })
2951            .collect();
2952        let refs: Vec<&[u8]> = owned.iter().map(|v| v.as_slice()).collect();
2953        let bytes = write_test_parquet_with_pk_column(&refs);
2954        let meta = load_parquet_meta(bytes);
2955
2956        assert!(should_read_pk_as_binary_with_limit(&meta, 1024));
2957    }
2958}