Skip to main content

mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17mod stream;
18
19#[cfg(feature = "vector_index")]
20use std::collections::BTreeSet;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{error, tracing, warn};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion_expr::Expr;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow::array::ArrayRef;
32use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::data_type::ConcreteDataType;
35use datatypes::prelude::DataType;
36use futures::StreamExt;
37use mito_codec::row_converter::build_primary_key_codec;
38use object_store::ObjectStore;
39use parquet::arrow::ProjectionMask;
40use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
41use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
42use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
43use partition::expr::PartitionExpr;
44use snafu::ResultExt;
45use store_api::codec::PrimaryKeyEncoding;
46use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
47use store_api::region_request::PathType;
48use store_api::storage::{ColumnId, FileId};
49use table::predicate::Predicate;
50
51use self::stream::{NestedSchemaAligner, ParquetErrorAdapter, ProjectedRecordBatchStream};
52use crate::cache::index::result_cache::PredicateKey;
53use crate::cache::{CacheStrategy, CachedSstMeta};
54#[cfg(feature = "vector_index")]
55use crate::error::ApplyVectorIndexSnafu;
56use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
57use crate::metrics::{
58    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
59    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
60};
61use crate::read::flat_projection::CompactionProjectionMapper;
62use crate::read::prune::FlatPruneReader;
63use crate::read::read_columns::ReadColumns;
64use crate::sst::file::FileHandle;
65use crate::sst::index::bloom_filter::applier::{
66    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
67};
68use crate::sst::index::fulltext_index::applier::{
69    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
70};
71use crate::sst::index::inverted_index::applier::{
72    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
73};
74#[cfg(feature = "vector_index")]
75use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
76use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
77use crate::sst::parquet::async_reader::SstAsyncFileReader;
78use crate::sst::parquet::file_range::{
79    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
80};
81use crate::sst::parquet::flat_format::FlatReadFormat;
82use crate::sst::parquet::format::need_override_sequence;
83use crate::sst::parquet::metadata::MetadataLoader;
84use crate::sst::parquet::prefilter::{
85    PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
86};
87use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
88use crate::sst::parquet::row_group::ParquetFetchMetrics;
89use crate::sst::parquet::row_selection::RowGroupSelection;
90use crate::sst::parquet::stats::RowGroupPruningStats;
91use crate::sst::tag_maybe_to_dictionary_field;
92
93const INDEX_TYPE_FULLTEXT: &str = "fulltext";
94const INDEX_TYPE_INVERTED: &str = "inverted";
95const INDEX_TYPE_BLOOM: &str = "bloom filter";
96const INDEX_TYPE_VECTOR: &str = "vector";
97
98macro_rules! handle_index_error {
99    ($err:expr, $file_handle:expr, $index_type:expr) => {
100        if cfg!(any(test, feature = "test")) {
101            panic!(
102                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
103                $index_type,
104                $file_handle.region_id(),
105                $file_handle.file_id(),
106                $err
107            );
108        } else {
109            warn!(
110                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
111                $index_type,
112                $file_handle.region_id(),
113                $file_handle.file_id()
114            );
115        }
116    };
117}
118
119/// Parquet SST reader builder.
120pub struct ParquetReaderBuilder {
121    /// SST directory.
122    table_dir: String,
123    /// Path type for generating file paths.
124    path_type: PathType,
125    file_handle: FileHandle,
126    object_store: ObjectStore,
127    /// Predicate to push down.
128    predicate: Option<Predicate>,
129    /// The columns to read.
130    ///
131    /// `None` reads all columns. Due to schema change, the projection
132    /// can contain columns not in the parquet file.
133    read_cols: Option<ReadColumns>,
134    /// Strategy to cache SST data.
135    cache_strategy: CacheStrategy,
136    /// Index appliers.
137    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
138    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
139    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
140    /// Vector index applier for KNN search.
141    #[cfg(feature = "vector_index")]
142    vector_index_applier: Option<VectorIndexApplierRef>,
143    /// Over-fetched k for vector index scan.
144    #[cfg(feature = "vector_index")]
145    vector_index_k: Option<usize>,
146    /// Expected metadata of the region while reading the SST.
147    /// This is usually the latest metadata of the region. The reader use
148    /// it get the correct column id of a column by name.
149    expected_metadata: Option<RegionMetadataRef>,
150    /// Whether this reader is for compaction.
151    compaction: bool,
152    /// Mode to pre-filter columns.
153    pre_filter_mode: PreFilterMode,
154    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
155    decode_primary_key_values: bool,
156    page_index_policy: PageIndexPolicy,
157}
158
159impl ParquetReaderBuilder {
160    /// Returns a new [ParquetReaderBuilder] to read specific SST.
161    pub fn new(
162        table_dir: String,
163        path_type: PathType,
164        file_handle: FileHandle,
165        object_store: ObjectStore,
166    ) -> ParquetReaderBuilder {
167        ParquetReaderBuilder {
168            table_dir,
169            path_type,
170            file_handle,
171            object_store,
172            predicate: None,
173            read_cols: None,
174            cache_strategy: CacheStrategy::Disabled,
175            inverted_index_appliers: [None, None],
176            bloom_filter_index_appliers: [None, None],
177            fulltext_index_appliers: [None, None],
178            #[cfg(feature = "vector_index")]
179            vector_index_applier: None,
180            #[cfg(feature = "vector_index")]
181            vector_index_k: None,
182            expected_metadata: None,
183            compaction: false,
184            pre_filter_mode: PreFilterMode::All,
185            decode_primary_key_values: false,
186            page_index_policy: Default::default(),
187        }
188    }
189
190    /// Attaches the predicate to the builder.
191    #[must_use]
192    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
193        self.predicate = predicate;
194        self
195    }
196
197    /// Attaches the projection to the builder.
198    ///
199    /// The reader only applies the projection to fields.
200    #[must_use]
201    pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
202        self.read_cols = read_cols;
203        self
204    }
205
206    /// Attaches the cache to the builder.
207    #[must_use]
208    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
209        self.cache_strategy = cache;
210        self
211    }
212
213    /// Attaches the inverted index appliers to the builder.
214    #[must_use]
215    pub(crate) fn inverted_index_appliers(
216        mut self,
217        index_appliers: [Option<InvertedIndexApplierRef>; 2],
218    ) -> Self {
219        self.inverted_index_appliers = index_appliers;
220        self
221    }
222
223    /// Attaches the bloom filter index appliers to the builder.
224    #[must_use]
225    pub(crate) fn bloom_filter_index_appliers(
226        mut self,
227        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
228    ) -> Self {
229        self.bloom_filter_index_appliers = index_appliers;
230        self
231    }
232
233    /// Attaches the fulltext index appliers to the builder.
234    #[must_use]
235    pub(crate) fn fulltext_index_appliers(
236        mut self,
237        index_appliers: [Option<FulltextIndexApplierRef>; 2],
238    ) -> Self {
239        self.fulltext_index_appliers = index_appliers;
240        self
241    }
242
243    /// Attaches the vector index applier to the builder.
244    #[cfg(feature = "vector_index")]
245    #[must_use]
246    pub(crate) fn vector_index_applier(
247        mut self,
248        applier: Option<VectorIndexApplierRef>,
249        k: Option<usize>,
250    ) -> Self {
251        self.vector_index_applier = applier;
252        self.vector_index_k = k;
253        self
254    }
255
256    /// Attaches the expected metadata to the builder.
257    #[must_use]
258    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
259        self.expected_metadata = expected_metadata;
260        self
261    }
262
263    /// Sets the compaction flag.
264    #[must_use]
265    pub fn compaction(mut self, compaction: bool) -> Self {
266        self.compaction = compaction;
267        self
268    }
269
270    /// Sets the pre-filter mode.
271    #[must_use]
272    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
273        self.pre_filter_mode = pre_filter_mode;
274        self
275    }
276
277    /// Decodes primary key values eagerly when reading primary key format SSTs.
278    #[must_use]
279    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
280        self.decode_primary_key_values = decode;
281        self
282    }
283
284    #[must_use]
285    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
286        self.page_index_policy = page_index_policy;
287        self
288    }
289
290    /// Builds a [ParquetReader].
291    ///
292    /// This needs to perform IO operation.
293    #[tracing::instrument(
294        skip_all,
295        fields(
296            region_id = %self.file_handle.region_id(),
297            file_id = %self.file_handle.file_id()
298        )
299    )]
300    pub async fn build(&self) -> Result<Option<ParquetReader>> {
301        let mut metrics = ReaderMetrics::default();
302
303        let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
304            return Ok(None);
305        };
306        ParquetReader::new(Arc::new(context), selection)
307            .await
308            .map(Some)
309    }
310
311    /// Builds a [FileRangeContext] and collects row groups to read.
312    ///
313    /// This needs to perform IO operation.
314    #[tracing::instrument(
315        skip_all,
316        fields(
317            region_id = %self.file_handle.region_id(),
318            file_id = %self.file_handle.file_id()
319        )
320    )]
321    pub(crate) async fn build_reader_input(
322        &self,
323        metrics: &mut ReaderMetrics,
324    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
325        self.build_reader_input_inner(metrics).await
326    }
327
328    async fn build_reader_input_inner(
329        &self,
330        metrics: &mut ReaderMetrics,
331    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
332        let start = Instant::now();
333
334        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
335        let file_size = self.file_handle.meta_ref().file_size;
336
337        // Loads parquet metadata of the file.
338        let (sst_meta, cache_miss) = self
339            .read_parquet_metadata(
340                &file_path,
341                file_size,
342                &mut metrics.metadata_cache_metrics,
343                self.page_index_policy,
344            )
345            .await?;
346        let parquet_meta = sst_meta.parquet_metadata();
347        let region_meta = sst_meta.region_metadata();
348        let region_partition_expr_str = self
349            .expected_metadata
350            .as_ref()
351            .and_then(|meta| meta.partition_expr.as_ref())
352            .map(|expr| expr.as_str());
353        let (_, is_same_region_partition) = Self::is_same_region_partition(
354            region_partition_expr_str,
355            self.file_handle.meta_ref().partition_expr.as_ref(),
356        )?;
357        // Skip auto convert when:
358        // - compaction is enabled
359        // - region partition expr is same with file partition expr (no need to auto convert)
360        let skip_auto_convert = self.compaction && is_same_region_partition;
361
362        // Build a compaction projection helper when:
363        // - compaction is enabled
364        // - region partition expr differs from file partition expr
365        // - flat format is enabled
366        // - primary key encoding is sparse
367        //
368        // This is applied after row-group filtering to align batches with flat output schema
369        // before compat handling.
370        let compaction_projection_mapper = if self.compaction
371            && !is_same_region_partition
372            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
373        {
374            Some(CompactionProjectionMapper::try_new(&region_meta)?)
375        } else {
376            None
377        };
378
379        let read_cols = if let Some(read_cols) = &self.read_cols {
380            read_cols.clone()
381        } else {
382            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
383            // Lists all column ids to read, we always use the expected metadata if possible.
384            ReadColumns::from_deduped_column_ids(
385                expected_meta
386                    .column_metadatas
387                    .iter()
388                    .map(|col| col.column_id),
389            )
390        };
391        let mut read_format = FlatReadFormat::new(
392            region_meta.clone(),
393            read_cols,
394            Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395            &file_path,
396            skip_auto_convert,
397        )?;
398        if need_override_sequence(&parquet_meta) {
399            read_format
400                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
401        }
402
403        // Computes the projection mask.
404        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
405        let parquet_read_cols = read_format.parquet_read_columns();
406        let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
407        let selection = self
408            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
409            .await;
410
411        if selection.is_empty() {
412            metrics.build_cost += start.elapsed();
413            return Ok(None);
414        }
415
416        // Trigger background download if metadata had a cache miss and selection is not empty
417        if cache_miss && !selection.is_empty() {
418            use crate::cache::file_cache::{FileType, IndexKey};
419            let index_key = IndexKey::new(
420                self.file_handle.region_id(),
421                self.file_handle.file_id().file_id(),
422                FileType::Parquet,
423            );
424            self.cache_strategy.maybe_download_background(
425                index_key,
426                file_path.clone(),
427                self.object_store.clone(),
428                file_size,
429            );
430        }
431
432        let prune_schema = self
433            .expected_metadata
434            .as_ref()
435            .map(|meta| meta.schema.clone())
436            .unwrap_or_else(|| region_meta.schema.clone());
437
438        // Create ArrowReaderMetadata for async stream building.
439        let arrow_reader_options =
440            ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
441        let arrow_metadata =
442            ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
443                .context(ReadDataPartSnafu)?;
444
445        let dyn_filters = if let Some(predicate) = &self.predicate {
446            predicate.dyn_filters().as_ref().clone()
447        } else {
448            vec![]
449        };
450
451        let codec = build_primary_key_codec(read_format.metadata());
452
453        let filter_plan = build_reader_filter_plan(
454            self.predicate.as_ref(),
455            self.expected_metadata.as_deref(),
456            self.pre_filter_mode,
457            &read_format,
458            parquet_meta.file_metadata().schema_descr(),
459            &codec,
460        );
461
462        let output_schema = read_format.output_arrow_schema()?;
463
464        let reader_builder = RowGroupReaderBuilder {
465            file_handle: self.file_handle.clone(),
466            file_path,
467            parquet_meta,
468            arrow_metadata,
469            output_schema,
470            object_store: self.object_store.clone(),
471            projection: projection_plan,
472            has_nested_projection: parquet_read_cols.has_nested(),
473            cache_strategy: self.cache_strategy.clone(),
474            prefilter_builder: filter_plan.prefilter_builder,
475        };
476
477        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
478
479        let context = FileRangeContext::new(
480            reader_builder,
481            RangeBase {
482                filters: filter_plan.remaining_simple_filters,
483                dyn_filters,
484                read_format,
485                expected_metadata: self.expected_metadata.clone(),
486                prune_schema,
487                codec,
488                compat_batch: None,
489                compaction_projection_mapper,
490                pre_filter_mode: self.pre_filter_mode,
491                partition_filter,
492            },
493        );
494
495        metrics.build_cost += start.elapsed();
496
497        Ok(Some((context, selection)))
498    }
499
500    fn is_same_region_partition(
501        region_partition_expr_str: Option<&str>,
502        file_partition_expr: Option<&PartitionExpr>,
503    ) -> Result<(Option<PartitionExpr>, bool)> {
504        let region_partition_expr = match region_partition_expr_str {
505            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
506            None => None,
507        };
508
509        let is_same = region_partition_expr.as_ref() == file_partition_expr;
510        Ok((region_partition_expr, is_same))
511    }
512
513    /// Compare partition expressions from expected metadata and file metadata,
514    /// and build a partition filter if they differ.
515    fn build_partition_filter(
516        &self,
517        read_format: &FlatReadFormat,
518        prune_schema: &Arc<datatypes::schema::Schema>,
519    ) -> Result<Option<PartitionFilterContext>> {
520        let region_partition_expr_str = self
521            .expected_metadata
522            .as_ref()
523            .and_then(|meta| meta.partition_expr.as_ref());
524        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
525
526        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
527            region_partition_expr_str.map(|s| s.as_str()),
528            file_partition_expr_ref,
529        )?;
530
531        if is_same_region_partition {
532            return Ok(None);
533        }
534
535        let Some(region_partition_expr) = region_partition_expr else {
536            return Ok(None);
537        };
538
539        // Collect columns referenced by the partition expression.
540        let mut referenced_columns = HashSet::new();
541        region_partition_expr.collect_column_names(&mut referenced_columns);
542
543        // Build a partition_schema containing only referenced columns.
544        let partition_schema = Arc::new(datatypes::schema::Schema::new(
545            prune_schema
546                .column_schemas()
547                .iter()
548                .filter(|col| referenced_columns.contains(&col.name))
549                .map(|col| {
550                    if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
551                        && column_meta.semantic_type == SemanticType::Tag
552                        && col.data_type.is_string()
553                    {
554                        let field = Arc::new(Field::new(
555                            &col.name,
556                            col.data_type.as_arrow_type(),
557                            col.is_nullable(),
558                        ));
559                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
560                        let mut column = col.clone();
561                        column.data_type =
562                            ConcreteDataType::from_arrow_type(dict_field.data_type());
563                        return column;
564                    }
565
566                    col.clone()
567                })
568                .collect::<Vec<_>>(),
569        ));
570
571        let region_partition_physical_expr = region_partition_expr
572            .try_as_physical_expr(partition_schema.arrow_schema())
573            .context(SerializePartitionExprSnafu)?;
574
575        Ok(Some(PartitionFilterContext {
576            region_partition_physical_expr,
577            partition_schema,
578        }))
579    }
580
581    /// Reads parquet metadata of specific file.
582    /// Returns (fused metadata, cache_miss_flag).
583    async fn read_parquet_metadata(
584        &self,
585        file_path: &str,
586        file_size: u64,
587        cache_metrics: &mut MetadataCacheMetrics,
588        page_index_policy: PageIndexPolicy,
589    ) -> Result<(Arc<CachedSstMeta>, bool)> {
590        let start = Instant::now();
591        let _t = READ_STAGE_ELAPSED
592            .with_label_values(&["read_parquet_metadata"])
593            .start_timer();
594
595        let file_id = self.file_handle.file_id();
596        // Tries to get from cache with metrics tracking.
597        if let Some(metadata) = self
598            .cache_strategy
599            .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
600            .await
601        {
602            cache_metrics.metadata_load_cost += start.elapsed();
603            return Ok((metadata, false));
604        }
605
606        // Cache miss, load metadata directly.
607        let mut metadata_loader =
608            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
609        metadata_loader.with_page_index_policy(page_index_policy);
610        let metadata = metadata_loader.load(cache_metrics).await?;
611
612        let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
613        // Cache the metadata.
614        self.cache_strategy
615            .put_sst_meta_data(file_id, metadata.clone());
616
617        cache_metrics.metadata_load_cost += start.elapsed();
618        Ok((metadata, true))
619    }
620
621    /// Computes row groups to read, along with their respective row selections.
622    #[tracing::instrument(
623        skip_all,
624        fields(
625            region_id = %self.file_handle.region_id(),
626            file_id = %self.file_handle.file_id()
627        )
628    )]
629    async fn row_groups_to_read(
630        &self,
631        read_format: &FlatReadFormat,
632        parquet_meta: &ParquetMetaData,
633        metrics: &mut ReaderFilterMetrics,
634    ) -> RowGroupSelection {
635        let num_row_groups = parquet_meta.num_row_groups();
636        let num_rows = parquet_meta.file_metadata().num_rows();
637        if num_row_groups == 0 || num_rows == 0 {
638            return RowGroupSelection::default();
639        }
640
641        // Let's assume that the number of rows in the first row group
642        // can represent the `row_group_size` of the Parquet file.
643        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
644        if row_group_size == 0 {
645            return RowGroupSelection::default();
646        }
647
648        metrics.rg_total += num_row_groups;
649        metrics.rows_total += num_rows as usize;
650
651        // Compute skip_fields once for all pruning operations
652        let skip_fields = self.pre_filter_mode.skip_fields();
653
654        let mut output = self.row_groups_by_minmax(
655            read_format,
656            parquet_meta,
657            row_group_size,
658            num_rows as usize,
659            metrics,
660            skip_fields,
661        );
662        if output.is_empty() {
663            return output;
664        }
665
666        let fulltext_filtered = self
667            .prune_row_groups_by_fulltext_index(
668                row_group_size,
669                num_row_groups,
670                &mut output,
671                metrics,
672                skip_fields,
673            )
674            .await;
675        if output.is_empty() {
676            return output;
677        }
678
679        self.prune_row_groups_by_inverted_index(
680            row_group_size,
681            num_row_groups,
682            &mut output,
683            metrics,
684            skip_fields,
685        )
686        .await;
687        if output.is_empty() {
688            return output;
689        }
690
691        self.prune_row_groups_by_bloom_filter(
692            row_group_size,
693            parquet_meta,
694            &mut output,
695            metrics,
696            skip_fields,
697        )
698        .await;
699        if output.is_empty() {
700            return output;
701        }
702
703        if !fulltext_filtered {
704            self.prune_row_groups_by_fulltext_bloom(
705                row_group_size,
706                parquet_meta,
707                &mut output,
708                metrics,
709                skip_fields,
710            )
711            .await;
712        }
713        #[cfg(feature = "vector_index")]
714        {
715            self.prune_row_groups_by_vector_index(
716                row_group_size,
717                num_row_groups,
718                &mut output,
719                metrics,
720            )
721            .await;
722            if output.is_empty() {
723                return output;
724            }
725        }
726        output
727    }
728
729    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
730    async fn prune_row_groups_by_fulltext_index(
731        &self,
732        row_group_size: usize,
733        num_row_groups: usize,
734        output: &mut RowGroupSelection,
735        metrics: &mut ReaderFilterMetrics,
736        skip_fields: bool,
737    ) -> bool {
738        if !self.file_handle.meta_ref().fulltext_index_available() {
739            return false;
740        }
741
742        let mut pruned = false;
743        // If skip_fields is true, only apply the first applier (for tags).
744        let appliers = if skip_fields {
745            &self.fulltext_index_appliers[..1]
746        } else {
747            &self.fulltext_index_appliers[..]
748        };
749        for index_applier in appliers.iter().flatten() {
750            let predicate_key = index_applier.predicate_key();
751            // Fast path: return early if the result is in the cache.
752            let cached = self
753                .cache_strategy
754                .index_result_cache()
755                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
756            if let Some(result) = cached.as_ref()
757                && all_required_row_groups_searched(output, result)
758            {
759                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
760                metrics.fulltext_index_cache_hit += 1;
761                pruned = true;
762                continue;
763            }
764
765            // Slow path: apply the index from the file.
766            metrics.fulltext_index_cache_miss += 1;
767            let file_size_hint = self.file_handle.meta_ref().index_file_size();
768            let apply_res = index_applier
769                .apply_fine(
770                    self.file_handle.index_id(),
771                    Some(file_size_hint),
772                    metrics.fulltext_index_apply_metrics.as_mut(),
773                )
774                .await;
775            let selection = match apply_res {
776                Ok(Some(res)) => {
777                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
778                }
779                Ok(None) => continue,
780                Err(err) => {
781                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
782                    continue;
783                }
784            };
785
786            self.apply_index_result_and_update_cache(
787                predicate_key,
788                self.file_handle.file_id().file_id(),
789                selection,
790                output,
791                metrics,
792                INDEX_TYPE_FULLTEXT,
793            );
794            pruned = true;
795        }
796        pruned
797    }
798
799    /// Applies index to prune row groups.
800    ///
801    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
802    /// as an escape route in case of index issues, and it can be used to test
803    /// the correctness of the index.
804    async fn prune_row_groups_by_inverted_index(
805        &self,
806        row_group_size: usize,
807        num_row_groups: usize,
808        output: &mut RowGroupSelection,
809        metrics: &mut ReaderFilterMetrics,
810        skip_fields: bool,
811    ) -> bool {
812        if !self.file_handle.meta_ref().inverted_index_available() {
813            return false;
814        }
815
816        let mut pruned = false;
817        // If skip_fields is true, only apply the first applier (for tags).
818        let appliers = if skip_fields {
819            &self.inverted_index_appliers[..1]
820        } else {
821            &self.inverted_index_appliers[..]
822        };
823        for index_applier in appliers.iter().flatten() {
824            let predicate_key = index_applier.predicate_key();
825            // Fast path: return early if the result is in the cache.
826            let cached = self
827                .cache_strategy
828                .index_result_cache()
829                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
830            if let Some(result) = cached.as_ref()
831                && all_required_row_groups_searched(output, result)
832            {
833                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
834                metrics.inverted_index_cache_hit += 1;
835                pruned = true;
836                continue;
837            }
838
839            // Slow path: apply the index from the file.
840            metrics.inverted_index_cache_miss += 1;
841            let file_size_hint = self.file_handle.meta_ref().index_file_size();
842            let apply_res = index_applier
843                .apply(
844                    self.file_handle.index_id(),
845                    Some(file_size_hint),
846                    metrics.inverted_index_apply_metrics.as_mut(),
847                )
848                .await;
849            let selection = match apply_res {
850                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
851                    row_group_size,
852                    num_row_groups,
853                    apply_output,
854                ),
855                Err(err) => {
856                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
857                    continue;
858                }
859            };
860
861            self.apply_index_result_and_update_cache(
862                predicate_key,
863                self.file_handle.file_id().file_id(),
864                selection,
865                output,
866                metrics,
867                INDEX_TYPE_INVERTED,
868            );
869            pruned = true;
870        }
871        pruned
872    }
873
874    async fn prune_row_groups_by_bloom_filter(
875        &self,
876        row_group_size: usize,
877        parquet_meta: &ParquetMetaData,
878        output: &mut RowGroupSelection,
879        metrics: &mut ReaderFilterMetrics,
880        skip_fields: bool,
881    ) -> bool {
882        if !self.file_handle.meta_ref().bloom_filter_index_available() {
883            return false;
884        }
885
886        let mut pruned = false;
887        // If skip_fields is true, only apply the first applier (for tags).
888        let appliers = if skip_fields {
889            &self.bloom_filter_index_appliers[..1]
890        } else {
891            &self.bloom_filter_index_appliers[..]
892        };
893        for index_applier in appliers.iter().flatten() {
894            let predicate_key = index_applier.predicate_key();
895            // Fast path: return early if the result is in the cache.
896            let cached = self
897                .cache_strategy
898                .index_result_cache()
899                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
900            if let Some(result) = cached.as_ref()
901                && all_required_row_groups_searched(output, result)
902            {
903                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
904                metrics.bloom_filter_cache_hit += 1;
905                pruned = true;
906                continue;
907            }
908
909            // Slow path: apply the index from the file.
910            metrics.bloom_filter_cache_miss += 1;
911            let file_size_hint = self.file_handle.meta_ref().index_file_size();
912            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
913                (
914                    rg.num_rows() as usize,
915                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
916                    output.contains_non_empty_row_group(i)
917                        && cached
918                            .as_ref()
919                            .map(|c| !c.contains_row_group(i))
920                            .unwrap_or(true),
921                )
922            });
923            let apply_res = index_applier
924                .apply(
925                    self.file_handle.index_id(),
926                    Some(file_size_hint),
927                    rgs,
928                    metrics.bloom_filter_apply_metrics.as_mut(),
929                )
930                .await;
931            let mut selection = match apply_res {
932                Ok(apply_output) => {
933                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
934                }
935                Err(err) => {
936                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
937                    continue;
938                }
939            };
940
941            // New searched row groups are added to `selection`, concat them with `cached`.
942            if let Some(cached) = cached.as_ref() {
943                selection.concat(cached);
944            }
945
946            self.apply_index_result_and_update_cache(
947                predicate_key,
948                self.file_handle.file_id().file_id(),
949                selection,
950                output,
951                metrics,
952                INDEX_TYPE_BLOOM,
953            );
954            pruned = true;
955        }
956        pruned
957    }
958
959    /// Prunes row groups by vector index results.
960    #[cfg(feature = "vector_index")]
961    async fn prune_row_groups_by_vector_index(
962        &self,
963        row_group_size: usize,
964        num_row_groups: usize,
965        output: &mut RowGroupSelection,
966        metrics: &mut ReaderFilterMetrics,
967    ) {
968        let Some(applier) = &self.vector_index_applier else {
969            return;
970        };
971        let Some(k) = self.vector_index_k else {
972            return;
973        };
974        if !self.file_handle.meta_ref().vector_index_available() {
975            return;
976        }
977
978        let file_size_hint = self.file_handle.meta_ref().index_file_size();
979        let apply_res = applier
980            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
981            .await;
982        let row_ids = match apply_res {
983            Ok(res) => res.row_offsets,
984            Err(err) => {
985                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
986                return;
987            }
988        };
989
990        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
991        {
992            Ok(selection) => selection,
993            Err(err) => {
994                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
995                return;
996            }
997        };
998        metrics.rows_vector_selected += selection.row_count();
999        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1000    }
1001
1002    async fn prune_row_groups_by_fulltext_bloom(
1003        &self,
1004        row_group_size: usize,
1005        parquet_meta: &ParquetMetaData,
1006        output: &mut RowGroupSelection,
1007        metrics: &mut ReaderFilterMetrics,
1008        skip_fields: bool,
1009    ) -> bool {
1010        if !self.file_handle.meta_ref().fulltext_index_available() {
1011            return false;
1012        }
1013
1014        let mut pruned = false;
1015        // If skip_fields is true, only apply the first applier (for tags).
1016        let appliers = if skip_fields {
1017            &self.fulltext_index_appliers[..1]
1018        } else {
1019            &self.fulltext_index_appliers[..]
1020        };
1021        for index_applier in appliers.iter().flatten() {
1022            let predicate_key = index_applier.predicate_key();
1023            // Fast path: return early if the result is in the cache.
1024            let cached = self
1025                .cache_strategy
1026                .index_result_cache()
1027                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1028            if let Some(result) = cached.as_ref()
1029                && all_required_row_groups_searched(output, result)
1030            {
1031                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1032                metrics.fulltext_index_cache_hit += 1;
1033                pruned = true;
1034                continue;
1035            }
1036
1037            // Slow path: apply the index from the file.
1038            metrics.fulltext_index_cache_miss += 1;
1039            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1040            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1041                (
1042                    rg.num_rows() as usize,
1043                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1044                    output.contains_non_empty_row_group(i)
1045                        && cached
1046                            .as_ref()
1047                            .map(|c| !c.contains_row_group(i))
1048                            .unwrap_or(true),
1049                )
1050            });
1051            let apply_res = index_applier
1052                .apply_coarse(
1053                    self.file_handle.index_id(),
1054                    Some(file_size_hint),
1055                    rgs,
1056                    metrics.fulltext_index_apply_metrics.as_mut(),
1057                )
1058                .await;
1059            let mut selection = match apply_res {
1060                Ok(Some(apply_output)) => {
1061                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1062                }
1063                Ok(None) => continue,
1064                Err(err) => {
1065                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1066                    continue;
1067                }
1068            };
1069
1070            // New searched row groups are added to `selection`, concat them with `cached`.
1071            if let Some(cached) = cached.as_ref() {
1072                selection.concat(cached);
1073            }
1074
1075            self.apply_index_result_and_update_cache(
1076                predicate_key,
1077                self.file_handle.file_id().file_id(),
1078                selection,
1079                output,
1080                metrics,
1081                INDEX_TYPE_FULLTEXT,
1082            );
1083            pruned = true;
1084        }
1085        pruned
1086    }
1087
1088    /// Computes row groups selection after min-max pruning.
1089    fn row_groups_by_minmax(
1090        &self,
1091        read_format: &FlatReadFormat,
1092        parquet_meta: &ParquetMetaData,
1093        row_group_size: usize,
1094        total_row_count: usize,
1095        metrics: &mut ReaderFilterMetrics,
1096        skip_fields: bool,
1097    ) -> RowGroupSelection {
1098        let Some(predicate) = &self.predicate else {
1099            return RowGroupSelection::new(row_group_size, total_row_count);
1100        };
1101
1102        let file_id = self.file_handle.file_id().file_id();
1103        let index_result_cache = self.cache_strategy.index_result_cache();
1104        let cached_minmax_key =
1105            if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1106                // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
1107                // building row-group pruning stats for identical predicates across queries.
1108                let mut exprs = predicate
1109                    .exprs()
1110                    .iter()
1111                    .map(|expr| format!("{expr:?}"))
1112                    .collect::<Vec<_>>();
1113                exprs.sort();
1114                let schema_version = self
1115                    .expected_metadata
1116                    .as_ref()
1117                    .map(|meta| meta.schema_version)
1118                    .unwrap_or_else(|| read_format.metadata().schema_version);
1119                Some(PredicateKey::new_minmax(
1120                    Arc::new(exprs),
1121                    schema_version,
1122                    skip_fields,
1123                ))
1124            } else {
1125                None
1126            };
1127
1128        if let Some(index_result_cache) = index_result_cache
1129            && let Some(predicate_key) = cached_minmax_key.as_ref()
1130        {
1131            if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1132                metrics.minmax_cache_hit += 1;
1133                let num_row_groups = parquet_meta.num_row_groups();
1134                metrics.rg_minmax_filtered +=
1135                    num_row_groups.saturating_sub(result.row_group_count());
1136                return (*result).clone();
1137            }
1138
1139            metrics.minmax_cache_miss += 1;
1140        }
1141
1142        let region_meta = read_format.metadata();
1143        let row_groups = parquet_meta.row_groups();
1144        let stats = RowGroupPruningStats::new(
1145            row_groups,
1146            read_format,
1147            self.expected_metadata.clone(),
1148            skip_fields,
1149        );
1150        let prune_schema = self
1151            .expected_metadata
1152            .as_ref()
1153            .map(|meta| meta.schema.arrow_schema())
1154            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1155
1156        // Here we use the schema of the SST to build the physical expression. If the column
1157        // in the SST doesn't have the same column id as the column in the expected metadata,
1158        // we will get a None statistics for that column.
1159        let mask = predicate.prune_with_stats(&stats, prune_schema);
1160        let output = RowGroupSelection::from_full_row_group_ids(
1161            mask.iter()
1162                .enumerate()
1163                .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1164            row_group_size,
1165            total_row_count,
1166        );
1167
1168        metrics.rg_minmax_filtered += parquet_meta
1169            .num_row_groups()
1170            .saturating_sub(output.row_group_count());
1171
1172        if let Some(index_result_cache) = index_result_cache
1173            && let Some(predicate_key) = cached_minmax_key
1174        {
1175            index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1176        }
1177
1178        output
1179    }
1180
1181    fn apply_index_result_and_update_cache(
1182        &self,
1183        predicate_key: &PredicateKey,
1184        file_id: FileId,
1185        result: RowGroupSelection,
1186        output: &mut RowGroupSelection,
1187        metrics: &mut ReaderFilterMetrics,
1188        index_type: &str,
1189    ) {
1190        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1191
1192        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1193            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1194        }
1195    }
1196}
1197fn apply_selection_and_update_metrics(
1198    output: &mut RowGroupSelection,
1199    result: &RowGroupSelection,
1200    metrics: &mut ReaderFilterMetrics,
1201    index_type: &str,
1202) {
1203    let intersection = output.intersect(result);
1204
1205    let row_group_count = output.row_group_count() - intersection.row_group_count();
1206    let row_count = output.row_count() - intersection.row_count();
1207
1208    metrics.update_index_metrics(index_type, row_group_count, row_count);
1209
1210    *output = intersection;
1211}
1212
1213#[cfg(feature = "vector_index")]
1214fn vector_selection_from_offsets(
1215    row_offsets: Vec<u64>,
1216    row_group_size: usize,
1217    num_row_groups: usize,
1218) -> Result<RowGroupSelection> {
1219    let mut row_ids = BTreeSet::new();
1220    for offset in row_offsets {
1221        let row_id = u32::try_from(offset).map_err(|_| {
1222            ApplyVectorIndexSnafu {
1223                reason: format!("Row offset {} exceeds u32::MAX", offset),
1224            }
1225            .build()
1226        })?;
1227        row_ids.insert(row_id);
1228    }
1229    Ok(RowGroupSelection::from_row_ids(
1230        row_ids,
1231        row_group_size,
1232        num_row_groups,
1233    ))
1234}
1235
1236fn all_required_row_groups_searched(
1237    required_row_groups: &RowGroupSelection,
1238    cached_row_groups: &RowGroupSelection,
1239) -> bool {
1240    required_row_groups.iter().all(|(rg_id, _)| {
1241        // Row group with no rows is not required to search.
1242        !required_row_groups.contains_non_empty_row_group(*rg_id)
1243            // The row group is already searched.
1244            || cached_row_groups.contains_row_group(*rg_id)
1245    })
1246}
1247
1248/// Metrics of filtering rows groups and rows.
1249#[derive(Debug, Default, Clone)]
1250pub(crate) struct ReaderFilterMetrics {
1251    /// Number of row groups before filtering.
1252    pub(crate) rg_total: usize,
1253    /// Number of row groups filtered by fulltext index.
1254    pub(crate) rg_fulltext_filtered: usize,
1255    /// Number of row groups filtered by inverted index.
1256    pub(crate) rg_inverted_filtered: usize,
1257    /// Number of row groups filtered by min-max index.
1258    pub(crate) rg_minmax_filtered: usize,
1259    /// Number of row groups filtered by bloom filter index.
1260    pub(crate) rg_bloom_filtered: usize,
1261    /// Number of row groups filtered by vector index.
1262    pub(crate) rg_vector_filtered: usize,
1263
1264    /// Number of rows in row group before filtering.
1265    pub(crate) rows_total: usize,
1266    /// Number of rows in row group filtered by fulltext index.
1267    pub(crate) rows_fulltext_filtered: usize,
1268    /// Number of rows in row group filtered by inverted index.
1269    pub(crate) rows_inverted_filtered: usize,
1270    /// Number of rows in row group filtered by bloom filter index.
1271    pub(crate) rows_bloom_filtered: usize,
1272    /// Number of rows filtered by vector index.
1273    pub(crate) rows_vector_filtered: usize,
1274    /// Number of rows selected by vector index.
1275    pub(crate) rows_vector_selected: usize,
1276    /// Number of rows filtered by precise filter.
1277    pub(crate) rows_precise_filtered: usize,
1278
1279    /// Number of index result cache hits for fulltext index.
1280    pub(crate) fulltext_index_cache_hit: usize,
1281    /// Number of index result cache misses for fulltext index.
1282    pub(crate) fulltext_index_cache_miss: usize,
1283    /// Number of index result cache hits for inverted index.
1284    pub(crate) inverted_index_cache_hit: usize,
1285    /// Number of index result cache misses for inverted index.
1286    pub(crate) inverted_index_cache_miss: usize,
1287    /// Number of index result cache hits for bloom filter index.
1288    pub(crate) bloom_filter_cache_hit: usize,
1289    /// Number of index result cache misses for bloom filter index.
1290    pub(crate) bloom_filter_cache_miss: usize,
1291    /// Number of index result cache hits for minmax pruning.
1292    pub(crate) minmax_cache_hit: usize,
1293    /// Number of index result cache misses for minmax pruning.
1294    pub(crate) minmax_cache_miss: usize,
1295
1296    /// Optional metrics for inverted index applier.
1297    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1298    /// Optional metrics for bloom filter index applier.
1299    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1300    /// Optional metrics for fulltext index applier.
1301    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1302
1303    /// Number of pruner builder cache hits.
1304    pub(crate) pruner_cache_hit: usize,
1305    /// Number of pruner builder cache misses.
1306    pub(crate) pruner_cache_miss: usize,
1307    /// Duration spent waiting for pruner to build file ranges.
1308    pub(crate) pruner_prune_cost: Duration,
1309}
1310
1311impl ReaderFilterMetrics {
1312    /// Adds `other` metrics to this metrics.
1313    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1314        self.rg_total += other.rg_total;
1315        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1316        self.rg_inverted_filtered += other.rg_inverted_filtered;
1317        self.rg_minmax_filtered += other.rg_minmax_filtered;
1318        self.rg_bloom_filtered += other.rg_bloom_filtered;
1319        self.rg_vector_filtered += other.rg_vector_filtered;
1320
1321        self.rows_total += other.rows_total;
1322        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1323        self.rows_inverted_filtered += other.rows_inverted_filtered;
1324        self.rows_bloom_filtered += other.rows_bloom_filtered;
1325        self.rows_vector_filtered += other.rows_vector_filtered;
1326        self.rows_vector_selected += other.rows_vector_selected;
1327        self.rows_precise_filtered += other.rows_precise_filtered;
1328
1329        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1330        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1331        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1332        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1333        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1334        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1335        self.minmax_cache_hit += other.minmax_cache_hit;
1336        self.minmax_cache_miss += other.minmax_cache_miss;
1337
1338        self.pruner_cache_hit += other.pruner_cache_hit;
1339        self.pruner_cache_miss += other.pruner_cache_miss;
1340        self.pruner_prune_cost += other.pruner_prune_cost;
1341
1342        // Merge optional applier metrics
1343        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1344            self.inverted_index_apply_metrics
1345                .get_or_insert_with(Default::default)
1346                .merge_from(other_metrics);
1347        }
1348        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1349            self.bloom_filter_apply_metrics
1350                .get_or_insert_with(Default::default)
1351                .merge_from(other_metrics);
1352        }
1353        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1354            self.fulltext_index_apply_metrics
1355                .get_or_insert_with(Default::default)
1356                .merge_from(other_metrics);
1357        }
1358    }
1359
1360    /// Reports metrics.
1361    pub(crate) fn observe(&self) {
1362        READ_ROW_GROUPS_TOTAL
1363            .with_label_values(&["before_filtering"])
1364            .inc_by(self.rg_total as u64);
1365        READ_ROW_GROUPS_TOTAL
1366            .with_label_values(&["fulltext_index_filtered"])
1367            .inc_by(self.rg_fulltext_filtered as u64);
1368        READ_ROW_GROUPS_TOTAL
1369            .with_label_values(&["inverted_index_filtered"])
1370            .inc_by(self.rg_inverted_filtered as u64);
1371        READ_ROW_GROUPS_TOTAL
1372            .with_label_values(&["minmax_index_filtered"])
1373            .inc_by(self.rg_minmax_filtered as u64);
1374        READ_ROW_GROUPS_TOTAL
1375            .with_label_values(&["bloom_filter_index_filtered"])
1376            .inc_by(self.rg_bloom_filtered as u64);
1377        READ_ROW_GROUPS_TOTAL
1378            .with_label_values(&["vector_index_filtered"])
1379            .inc_by(self.rg_vector_filtered as u64);
1380
1381        PRECISE_FILTER_ROWS_TOTAL
1382            .with_label_values(&["parquet"])
1383            .inc_by(self.rows_precise_filtered as u64);
1384        READ_ROWS_IN_ROW_GROUP_TOTAL
1385            .with_label_values(&["before_filtering"])
1386            .inc_by(self.rows_total as u64);
1387        READ_ROWS_IN_ROW_GROUP_TOTAL
1388            .with_label_values(&["fulltext_index_filtered"])
1389            .inc_by(self.rows_fulltext_filtered as u64);
1390        READ_ROWS_IN_ROW_GROUP_TOTAL
1391            .with_label_values(&["inverted_index_filtered"])
1392            .inc_by(self.rows_inverted_filtered as u64);
1393        READ_ROWS_IN_ROW_GROUP_TOTAL
1394            .with_label_values(&["bloom_filter_index_filtered"])
1395            .inc_by(self.rows_bloom_filtered as u64);
1396        READ_ROWS_IN_ROW_GROUP_TOTAL
1397            .with_label_values(&["vector_index_filtered"])
1398            .inc_by(self.rows_vector_filtered as u64);
1399    }
1400
1401    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1402        match index_type {
1403            INDEX_TYPE_FULLTEXT => {
1404                self.rg_fulltext_filtered += row_group_count;
1405                self.rows_fulltext_filtered += row_count;
1406            }
1407            INDEX_TYPE_INVERTED => {
1408                self.rg_inverted_filtered += row_group_count;
1409                self.rows_inverted_filtered += row_count;
1410            }
1411            INDEX_TYPE_BLOOM => {
1412                self.rg_bloom_filtered += row_group_count;
1413                self.rows_bloom_filtered += row_count;
1414            }
1415            INDEX_TYPE_VECTOR => {
1416                self.rg_vector_filtered += row_group_count;
1417                self.rows_vector_filtered += row_count;
1418            }
1419            _ => {}
1420        }
1421    }
1422}
1423
1424#[cfg(all(test, feature = "vector_index"))]
1425mod vector_index_tests {
1426    use super::*;
1427
1428    #[test]
1429    fn test_vector_selection_from_offsets() {
1430        let row_group_size = 4;
1431        let num_row_groups = 3;
1432        let selection =
1433            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1434                .unwrap();
1435
1436        assert_eq!(selection.row_group_count(), 3);
1437        assert_eq!(selection.row_count(), 4);
1438        assert!(selection.contains_non_empty_row_group(0));
1439        assert!(selection.contains_non_empty_row_group(1));
1440        assert!(selection.contains_non_empty_row_group(2));
1441    }
1442
1443    #[test]
1444    fn test_vector_selection_from_offsets_out_of_range() {
1445        let row_group_size = 4;
1446        let num_row_groups = 2;
1447        let selection = vector_selection_from_offsets(
1448            vec![0, 7, u64::from(u32::MAX) + 1],
1449            row_group_size,
1450            num_row_groups,
1451        );
1452        assert!(selection.is_err());
1453    }
1454
1455    #[test]
1456    fn test_vector_selection_updates_metrics() {
1457        let row_group_size = 4;
1458        let total_rows = 8;
1459        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1460        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1461        let mut metrics = ReaderFilterMetrics::default();
1462
1463        apply_selection_and_update_metrics(
1464            &mut output,
1465            &selection,
1466            &mut metrics,
1467            INDEX_TYPE_VECTOR,
1468        );
1469
1470        assert_eq!(metrics.rg_vector_filtered, 1);
1471        assert_eq!(metrics.rows_vector_filtered, 7);
1472        assert_eq!(output.row_count(), 1);
1473    }
1474}
1475
1476/// Metrics for parquet metadata cache operations.
1477#[derive(Default, Clone, Copy)]
1478pub(crate) struct MetadataCacheMetrics {
1479    /// Number of memory cache hits for parquet metadata.
1480    pub(crate) mem_cache_hit: usize,
1481    /// Number of file cache hits for parquet metadata.
1482    pub(crate) file_cache_hit: usize,
1483    /// Number of cache misses for parquet metadata.
1484    pub(crate) cache_miss: usize,
1485    /// Duration to load parquet metadata.
1486    pub(crate) metadata_load_cost: Duration,
1487    /// Number of read operations performed.
1488    pub(crate) num_reads: usize,
1489    /// Total bytes read from storage.
1490    pub(crate) bytes_read: u64,
1491}
1492
1493impl std::fmt::Debug for MetadataCacheMetrics {
1494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1495        let Self {
1496            mem_cache_hit,
1497            file_cache_hit,
1498            cache_miss,
1499            metadata_load_cost,
1500            num_reads,
1501            bytes_read,
1502        } = self;
1503
1504        if self.is_empty() {
1505            return write!(f, "{{}}");
1506        }
1507        write!(f, "{{")?;
1508
1509        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1510
1511        if *mem_cache_hit > 0 {
1512            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1513        }
1514        if *file_cache_hit > 0 {
1515            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1516        }
1517        if *cache_miss > 0 {
1518            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1519        }
1520        if *num_reads > 0 {
1521            write!(f, ", \"num_reads\":{}", num_reads)?;
1522        }
1523        if *bytes_read > 0 {
1524            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1525        }
1526
1527        write!(f, "}}")
1528    }
1529}
1530
1531impl MetadataCacheMetrics {
1532    /// Returns true if the metrics are empty (contain no meaningful data).
1533    pub(crate) fn is_empty(&self) -> bool {
1534        self.metadata_load_cost.is_zero()
1535    }
1536
1537    /// Adds `other` metrics to this metrics.
1538    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1539        self.mem_cache_hit += other.mem_cache_hit;
1540        self.file_cache_hit += other.file_cache_hit;
1541        self.cache_miss += other.cache_miss;
1542        self.metadata_load_cost += other.metadata_load_cost;
1543        self.num_reads += other.num_reads;
1544        self.bytes_read += other.bytes_read;
1545    }
1546}
1547
1548/// Parquet reader metrics.
1549#[derive(Debug, Default, Clone)]
1550pub struct ReaderMetrics {
1551    /// Filtered row groups and rows metrics.
1552    pub(crate) filter_metrics: ReaderFilterMetrics,
1553    /// Duration to build the parquet reader.
1554    pub(crate) build_cost: Duration,
1555    /// Duration to scan the reader.
1556    pub(crate) scan_cost: Duration,
1557    /// Number of record batches read.
1558    pub(crate) num_record_batches: usize,
1559    /// Number of batches decoded.
1560    pub(crate) num_batches: usize,
1561    /// Number of rows read.
1562    pub(crate) num_rows: usize,
1563    /// Metrics for parquet metadata cache.
1564    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1565    /// Optional metrics for page/row group fetch operations.
1566    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1567    /// Memory size of metadata loaded for building file ranges.
1568    pub(crate) metadata_mem_size: isize,
1569    /// Number of file range builders created.
1570    pub(crate) num_range_builders: isize,
1571}
1572
1573impl ReaderMetrics {
1574    /// Adds `other` metrics to this metrics.
1575    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1576        self.filter_metrics.merge_from(&other.filter_metrics);
1577        self.build_cost += other.build_cost;
1578        self.scan_cost += other.scan_cost;
1579        self.num_record_batches += other.num_record_batches;
1580        self.num_batches += other.num_batches;
1581        self.num_rows += other.num_rows;
1582        self.metadata_cache_metrics
1583            .merge_from(&other.metadata_cache_metrics);
1584        if let Some(other_fetch) = &other.fetch_metrics {
1585            if let Some(self_fetch) = &self.fetch_metrics {
1586                self_fetch.merge_from(other_fetch);
1587            } else {
1588                self.fetch_metrics = Some(other_fetch.clone());
1589            }
1590        }
1591        self.metadata_mem_size += other.metadata_mem_size;
1592        self.num_range_builders += other.num_range_builders;
1593    }
1594
1595    /// Reports total rows.
1596    pub(crate) fn observe_rows(&self, read_type: &str) {
1597        READ_ROWS_TOTAL
1598            .with_label_values(&[read_type])
1599            .inc_by(self.num_rows as u64);
1600    }
1601}
1602
1603/// Builder to build a [ParquetRecordBatchStream] for a row group.
1604pub(crate) struct RowGroupReaderBuilder {
1605    /// SST file to read.
1606    ///
1607    /// Holds the file handle to avoid the file purge it.
1608    file_handle: FileHandle,
1609    /// Path of the file.
1610    file_path: String,
1611    /// Metadata of the parquet file.
1612    parquet_meta: Arc<ParquetMetaData>,
1613    /// Arrow reader metadata for building async stream.
1614    arrow_metadata: ArrowReaderMetadata,
1615    /// Projected output schema aligned with `projection.projected_root_presence`.
1616    output_schema: SchemaRef,
1617    /// Object store as an Operator.
1618    object_store: ObjectStore,
1619    /// Projection mask.
1620    projection: ProjectionMaskPlan,
1621    /// Whether projected read columns include nested paths.
1622    has_nested_projection: bool,
1623    /// Cache.
1624    cache_strategy: CacheStrategy,
1625    /// Pre-built prefilter state. `None` if prefiltering is not applicable.
1626    prefilter_builder: Option<PrefilterContextBuilder>,
1627}
1628
1629/// Context passed to [RowGroupReaderBuilder::build()] carrying all information
1630/// needed for prefiltering decisions.
1631pub(crate) struct RowGroupBuildContext<'a> {
1632    /// Index of the row group to read.
1633    pub(crate) row_group_idx: usize,
1634    /// Row selection for the row group. `None` means all rows.
1635    pub(crate) row_selection: Option<RowSelection>,
1636    /// Metrics for tracking fetch operations.
1637    pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1638}
1639
1640impl RowGroupReaderBuilder {
1641    /// Path of the file to read.
1642    pub(crate) fn file_path(&self) -> &str {
1643        &self.file_path
1644    }
1645
1646    /// Handle of the file to read.
1647    pub(crate) fn file_handle(&self) -> &FileHandle {
1648        &self.file_handle
1649    }
1650
1651    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1652        &self.parquet_meta
1653    }
1654
1655    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1656        &self.cache_strategy
1657    }
1658
1659    pub(crate) fn has_predicate_prefilter(&self) -> bool {
1660        self.prefilter_builder.is_some()
1661    }
1662
1663    /// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`.
1664    ///
1665    /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
1666    /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
1667    /// 2. Reads the full projection with the refined row selection
1668    ///
1669    /// The prefilter pass is *best-effort pruning*, not the precise filter for the query.
1670    /// Predicates that cannot be lowered to prefilter columns (column not projected,
1671    /// expression not supported, etc.) are silently skipped. Correctness rests on the
1672    /// DataFusion `FilterExec` above this reader, which always re-applies the original
1673    /// predicate. Tag and timestamp predicates that flow through [`SimpleFilterEvaluator`]
1674    /// are an exception — the engine enforces them precisely, so the prefilter pass is the
1675    /// only place they execute. See [`build_reader_filter_plan`] for the bucketing rules.
1676    ///
1677    /// When the prefilter result selects no rows, the second read still issues but
1678    /// parquet-rs short-circuits before any column-chunk IO: the row-group state machine
1679    /// jumps to `Finished` once it sees `num_rows_selected() == 0`, so no fast path is
1680    /// added here.
1681    pub(crate) async fn build(
1682        &self,
1683        build_ctx: RowGroupBuildContext<'_>,
1684    ) -> Result<ProjectedRecordBatchStream> {
1685        let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1686
1687        let Some(mut prefilter_ctx) = prefilter_ctx else {
1688            // No prefilter applicable, build stream with full projection.
1689            let stream = self
1690                .build_with_projection(
1691                    build_ctx.row_group_idx,
1692                    build_ctx.row_selection,
1693                    self.projection.mask.clone(),
1694                    build_ctx.fetch_metrics,
1695                )
1696                .await?;
1697            return self.make_projected_stream(stream);
1698        };
1699
1700        let prefilter_start = Instant::now();
1701        let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1702        if let Some(metrics) = build_ctx.fetch_metrics {
1703            let mut data = metrics.data.lock().unwrap();
1704            data.prefilter_cost += prefilter_start.elapsed();
1705            data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1706        }
1707
1708        let refined_selection = Some(prefilter_result.refined_selection);
1709
1710        let stream = self
1711            .build_with_projection(
1712                build_ctx.row_group_idx,
1713                refined_selection,
1714                self.projection.mask.clone(),
1715                build_ctx.fetch_metrics,
1716            )
1717            .await?;
1718        self.make_projected_stream(stream)
1719    }
1720
1721    fn make_projected_stream(
1722        &self,
1723        stream: ParquetRecordBatchStream<SstAsyncFileReader>,
1724    ) -> Result<ProjectedRecordBatchStream> {
1725        let stream = ParquetErrorAdapter::new(stream, self.file_path.clone());
1726        if !self.has_nested_projection {
1727            return Ok(stream.boxed());
1728        }
1729
1730        Ok(NestedSchemaAligner::new(
1731            stream,
1732            self.projection.projected_root_presence.clone(),
1733            self.output_schema.clone(),
1734        )?
1735        .boxed())
1736    }
1737
1738    /// Builds a [ParquetRecordBatchStream] with a custom projection mask.
1739    pub(crate) async fn build_with_projection(
1740        &self,
1741        row_group_idx: usize,
1742        row_selection: Option<RowSelection>,
1743        projection: ProjectionMask,
1744        fetch_metrics: Option<&ParquetFetchMetrics>,
1745    ) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
1746        // Create async file reader with caching support.
1747        let async_reader = SstAsyncFileReader::new(
1748            self.file_handle.file_id(),
1749            self.file_path.clone(),
1750            self.object_store.clone(),
1751            self.cache_strategy.clone(),
1752            self.parquet_meta.clone(),
1753            row_group_idx,
1754        )
1755        .with_fetch_metrics(fetch_metrics.cloned());
1756
1757        // Build the async stream using ArrowReaderBuilder API.
1758        let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
1759            async_reader,
1760            self.arrow_metadata.clone(),
1761        );
1762        builder = builder
1763            .with_row_groups(vec![row_group_idx])
1764            .with_projection(projection)
1765            .with_batch_size(DEFAULT_READ_BATCH_SIZE);
1766
1767        if let Some(selection) = row_selection {
1768            builder = builder.with_row_selection(selection);
1769        }
1770
1771        let stream = builder.build().context(ReadParquetSnafu {
1772            path: &self.file_path,
1773        })?;
1774
1775        Ok(stream)
1776    }
1777}
1778
1779#[derive(Clone)]
1780/// The filter to evaluate or the prune result of the default value.
1781pub(crate) enum MaybeFilter {
1782    /// The filter to evaluate.
1783    Filter(SimpleFilterEvaluator),
1784    /// The filter matches the default value.
1785    Matched,
1786    /// The filter is pruned.
1787    Pruned,
1788}
1789
1790impl MaybeFilter {
1791    /// Returns the inner filter when it is available.
1792    pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
1793        match self {
1794            MaybeFilter::Filter(filter) => Some(filter),
1795            MaybeFilter::Matched | MaybeFilter::Pruned => None,
1796        }
1797    }
1798}
1799
1800#[derive(Clone)]
1801/// Context to evaluate the column filter for a parquet file.
1802pub(crate) struct SimpleFilterContext {
1803    /// Filter to evaluate.
1804    filter: MaybeFilter,
1805    /// Id of the column to evaluate.
1806    column_id: ColumnId,
1807    /// Semantic type of the column.
1808    semantic_type: SemanticType,
1809}
1810
1811impl SimpleFilterContext {
1812    /// Creates a context for the `expr`.
1813    ///
1814    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1815    /// expected metadata.
1816    pub(crate) fn new_opt(
1817        sst_meta: &RegionMetadataRef,
1818        expected_meta: Option<&RegionMetadata>,
1819        expr: &Expr,
1820    ) -> Option<Self> {
1821        let filter = SimpleFilterEvaluator::try_new(expr)?;
1822        let (column_metadata, maybe_filter) = match expected_meta {
1823            Some(meta) => {
1824                // Gets the column metadata from the expected metadata.
1825                let column = meta.column_by_name(filter.column_name())?;
1826                // Checks if the column is present in the SST metadata. We still uses the
1827                // column from the expected metadata.
1828                match sst_meta.column_by_id(column.column_id) {
1829                    Some(sst_column) => {
1830                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1831
1832                        (column, MaybeFilter::Filter(filter))
1833                    }
1834                    None => {
1835                        // If the column is not present in the SST metadata, we evaluate the filter
1836                        // against the default value of the column.
1837                        // If we can't evaluate the filter, we return None.
1838                        if pruned_by_default(&filter, column)? {
1839                            (column, MaybeFilter::Pruned)
1840                        } else {
1841                            (column, MaybeFilter::Matched)
1842                        }
1843                    }
1844                }
1845            }
1846            None => {
1847                let column = sst_meta.column_by_name(filter.column_name())?;
1848                (column, MaybeFilter::Filter(filter))
1849            }
1850        };
1851
1852        Some(Self {
1853            filter: maybe_filter,
1854            column_id: column_metadata.column_id,
1855            semantic_type: column_metadata.semantic_type,
1856        })
1857    }
1858
1859    /// Returns the filter to evaluate.
1860    pub(crate) fn filter(&self) -> &MaybeFilter {
1861        &self.filter
1862    }
1863
1864    /// Returns the column id.
1865    pub(crate) fn column_id(&self) -> ColumnId {
1866        self.column_id
1867    }
1868
1869    /// Returns the semantic type of the column.
1870    pub(crate) fn semantic_type(&self) -> SemanticType {
1871        self.semantic_type
1872    }
1873}
1874
1875/// Context to evaluate a physical expression for a parquet file.
1876#[derive(Clone)]
1877pub(crate) struct PhysicalFilterContext {
1878    /// Filter to evaluate.
1879    filter: Arc<dyn PhysicalExpr>,
1880    /// Id of the column to evaluate.
1881    column_id: ColumnId,
1882    /// Name of the column to evaluate.
1883    column_name: String,
1884    /// Semantic type of the column.
1885    semantic_type: SemanticType,
1886    /// Schema containing only the referenced column.
1887    schema: SchemaRef,
1888}
1889
1890impl PhysicalFilterContext {
1891    /// Creates a context for the `expr`.
1892    ///
1893    /// Returns None if the expression doesn't reference exactly one column or the
1894    /// column to filter doesn't exist in the SST metadata or the expected metadata.
1895    pub(crate) fn new_opt(
1896        sst_meta: &RegionMetadataRef,
1897        expected_meta: Option<&RegionMetadata>,
1898        read_format: &FlatReadFormat,
1899        expr: &Expr,
1900    ) -> Option<Self> {
1901        if !Self::is_prefilter_candidate(expr) {
1902            return None;
1903        }
1904        let column_name = Self::single_column_name(expr)?;
1905        let column_metadata = match expected_meta {
1906            Some(meta) => {
1907                let column = meta.column_by_name(&column_name)?;
1908                let sst_column = sst_meta.column_by_id(column.column_id)?;
1909                // Physical expr requires the column name to match the SST column name.
1910                if sst_column.column_schema.name != column_name {
1911                    return None;
1912                }
1913                column
1914            }
1915            None => sst_meta.column_by_name(&column_name)?,
1916        };
1917
1918        // The column must be present in the projected arrow schema for the
1919        // prefilter to be able to read it.
1920        let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
1921        let field = field.clone();
1922        let schema = Arc::new(ArrowSchema::new(vec![field]));
1923        let physical_expr = Predicate::to_physical_expr(expr, &schema)
1924            .inspect_err(|e| {
1925                error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
1926            })
1927            .ok()?;
1928
1929        Some(Self {
1930            filter: physical_expr,
1931            column_id: column_metadata.column_id,
1932            column_name,
1933            semantic_type: column_metadata.semantic_type,
1934            schema,
1935        })
1936    }
1937
1938    /// Returns true if the expression is a variant we want to evaluate as a
1939    /// physical prefilter. Binary exprs are intentionally excluded because
1940    /// [`SimpleFilterEvaluator`] already handles them.
1941    // TODO(yingwen): extend more expressions if necessary. For example, allow some cheap scalar functions (e.g. `lower`, `length`, date truncations)
1942    fn is_prefilter_candidate(expr: &Expr) -> bool {
1943        matches!(
1944            expr,
1945            Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
1946        )
1947    }
1948
1949    fn single_column_name(expr: &Expr) -> Option<String> {
1950        let mut columns = HashSet::new();
1951        if expr_to_columns(expr, &mut columns).is_err() {
1952            return None;
1953        }
1954        if columns.len() != 1 {
1955            return None;
1956        }
1957        columns.iter().next().map(|column| column.name.clone())
1958    }
1959
1960    /// Returns the filter to evaluate.
1961    pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
1962        &self.filter
1963    }
1964
1965    /// Returns the column id.
1966    pub(crate) fn column_id(&self) -> ColumnId {
1967        self.column_id
1968    }
1969
1970    /// Returns the column name.
1971    pub(crate) fn column_name(&self) -> &str {
1972        &self.column_name
1973    }
1974
1975    /// Returns the semantic type of the column.
1976    pub(crate) fn semantic_type(&self) -> SemanticType {
1977        self.semantic_type
1978    }
1979
1980    /// Returns the schema containing only the referenced column.
1981    pub(crate) fn schema(&self) -> &SchemaRef {
1982        &self.schema
1983    }
1984}
1985
1986/// Prune a column by its default value.
1987/// Returns false if we can't create the default value or evaluate the filter.
1988fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1989    let value = column.column_schema.create_default().ok().flatten()?;
1990    let scalar_value = value
1991        .try_to_scalar_value(&column.column_schema.data_type)
1992        .ok()?;
1993    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1994    Some(!matches)
1995}
1996
1997/// Parquet batch reader to read our SST format.
1998pub struct ParquetReader {
1999    /// File range context.
2000    context: FileRangeContextRef,
2001    /// Row group selection to read.
2002    selection: RowGroupSelection,
2003    /// Reader of current row group.
2004    reader: Option<FlatPruneReader>,
2005    /// Metrics for tracking row group fetch operations.
2006    fetch_metrics: ParquetFetchMetrics,
2007}
2008
2009impl ParquetReader {
2010    #[tracing::instrument(
2011        skip_all,
2012        fields(
2013            region_id = %self.context.reader_builder().file_handle.region_id(),
2014            file_id = %self.context.reader_builder().file_handle.file_id()
2015        )
2016    )]
2017    pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2018        loop {
2019            if let Some(reader) = &mut self.reader {
2020                if let Some(batch) = reader.next_batch().await? {
2021                    return Ok(Some(batch));
2022                }
2023                self.reader = None;
2024                continue;
2025            }
2026
2027            let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
2028                return Ok(None);
2029            };
2030
2031            let skip_fields = self.context.pre_filter_mode().skip_fields();
2032            let parquet_reader = self
2033                .context
2034                .reader_builder()
2035                .build(self.context.build_context(
2036                    row_group_idx,
2037                    Some(row_selection),
2038                    Some(&self.fetch_metrics),
2039                ))
2040                .await?;
2041            self.reader = Some(FlatPruneReader::new_with_row_group_reader(
2042                self.context.clone(),
2043                FlatRowGroupReader::new(self.context.clone(), parquet_reader),
2044                skip_fields,
2045            ));
2046        }
2047    }
2048    /// Creates a new reader.
2049    #[tracing::instrument(
2050        skip_all,
2051        fields(
2052            region_id = %context.reader_builder().file_handle.region_id(),
2053            file_id = %context.reader_builder().file_handle.file_id()
2054        )
2055    )]
2056    pub(crate) async fn new(
2057        context: FileRangeContextRef,
2058        mut selection: RowGroupSelection,
2059    ) -> Result<Self> {
2060        let fetch_metrics = ParquetFetchMetrics::default();
2061        let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
2062            let skip_fields = context.pre_filter_mode().skip_fields();
2063            let parquet_reader = context
2064                .reader_builder()
2065                .build(context.build_context(
2066                    row_group_idx,
2067                    Some(row_selection),
2068                    Some(&fetch_metrics),
2069                ))
2070                .await?;
2071            Some(FlatPruneReader::new_with_row_group_reader(
2072                context.clone(),
2073                FlatRowGroupReader::new(context.clone(), parquet_reader),
2074                skip_fields,
2075            ))
2076        } else {
2077            None
2078        };
2079
2080        Ok(ParquetReader {
2081            context,
2082            selection,
2083            reader,
2084            fetch_metrics,
2085        })
2086    }
2087
2088    /// Returns the metadata of the SST.
2089    pub fn metadata(&self) -> &RegionMetadataRef {
2090        self.context.read_format().metadata()
2091    }
2092
2093    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2094        self.context.reader_builder().parquet_meta.clone()
2095    }
2096}
2097
2098/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2099pub(crate) struct FlatRowGroupReader {
2100    /// Context for file ranges.
2101    context: FileRangeContextRef,
2102    /// Inner parquet record batch stream.
2103    stream: ProjectedRecordBatchStream,
2104    /// Cached sequence array to override sequences.
2105    override_sequence: Option<ArrayRef>,
2106}
2107
2108impl FlatRowGroupReader {
2109    /// Creates a new flat reader from file range.
2110    pub(crate) fn new(context: FileRangeContextRef, stream: ProjectedRecordBatchStream) -> Self {
2111        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2112        let override_sequence = context
2113            .read_format()
2114            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2115
2116        Self {
2117            context,
2118            stream,
2119            override_sequence,
2120        }
2121    }
2122
2123    /// Returns the next RecordBatch.
2124    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2125        match self.stream.next().await {
2126            Some(batch_result) => {
2127                let record_batch = batch_result?;
2128
2129                let record_batch = self
2130                    .context
2131                    .read_format()
2132                    .convert_batch(record_batch, self.override_sequence.as_ref())?;
2133                Ok(Some(record_batch))
2134            }
2135            None => Ok(None),
2136        }
2137    }
2138}
2139
2140#[cfg(test)]
2141mod tests {
2142    use std::any::Any;
2143    use std::fmt::{Debug, Formatter};
2144    use std::sync::{Arc, LazyLock};
2145
2146    use datafusion::arrow::datatypes::DataType;
2147    use datafusion_common::ScalarValue;
2148    use datafusion_expr::expr::ScalarFunction;
2149    use datafusion_expr::{
2150        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2151        col, lit,
2152    };
2153    use datatypes::arrow::array::{ArrayRef, Int64Array};
2154    use datatypes::arrow::record_batch::RecordBatch;
2155    use datatypes::prelude::ConcreteDataType;
2156    use datatypes::schema::ColumnSchema;
2157    use object_store::services::Memory;
2158    use parquet::arrow::ArrowWriter;
2159    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2160    use store_api::region_request::PathType;
2161    use table::predicate::Predicate;
2162
2163    use super::*;
2164    use crate::sst::parquet::metadata::MetadataLoader;
2165    use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2166
2167    #[tokio::test(flavor = "current_thread")]
2168    async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2169        #[derive(Eq, PartialEq, Hash)]
2170        struct PanicDebugUdf;
2171
2172        impl Debug for PanicDebugUdf {
2173            fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2174                panic!("minmax predicate key should not format exprs when cache is disabled");
2175            }
2176        }
2177
2178        impl ScalarUDFImpl for PanicDebugUdf {
2179            fn as_any(&self) -> &dyn Any {
2180                self
2181            }
2182
2183            fn name(&self) -> &str {
2184                "panic_debug_udf"
2185            }
2186
2187            fn signature(&self) -> &Signature {
2188                static SIGNATURE: LazyLock<Signature> =
2189                    LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2190                &SIGNATURE
2191            }
2192
2193            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2194                Ok(DataType::Int64)
2195            }
2196
2197            fn invoke_with_args(
2198                &self,
2199                _args: ScalarFunctionArgs,
2200            ) -> datafusion_common::Result<ColumnarValue> {
2201                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2202            }
2203        }
2204
2205        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2206        let file_handle = sst_file_handle(0, 1);
2207        let table_dir = "test_table".to_string();
2208        let path_type = PathType::Bare;
2209        let file_path = file_handle.file_path(&table_dir, path_type);
2210
2211        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2212        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2213        let mut parquet_bytes = Vec::new();
2214        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2215        writer.write(&batch).unwrap();
2216        writer.close().unwrap();
2217        let file_size = parquet_bytes.len() as u64;
2218        object_store.write(&file_path, parquet_bytes).await.unwrap();
2219
2220        let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2221        let read_format = FlatReadFormat::new(
2222            region_metadata.clone(),
2223            ReadColumns::from_deduped_column_ids(
2224                region_metadata
2225                    .column_metadatas
2226                    .iter()
2227                    .map(|column| column.column_id),
2228            ),
2229            None,
2230            &file_path,
2231            false,
2232        )
2233        .unwrap();
2234
2235        let mut cache_metrics = MetadataCacheMetrics::default();
2236        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2237        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2238
2239        let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2240        let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2241            udf,
2242            vec![],
2243        ))]);
2244        let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2245            .predicate(Some(predicate))
2246            .cache(CacheStrategy::Disabled);
2247
2248        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2249        let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2250        let mut metrics = ReaderFilterMetrics::default();
2251        let selection = builder.row_groups_by_minmax(
2252            &read_format,
2253            &parquet_meta,
2254            row_group_size,
2255            total_row_count,
2256            &mut metrics,
2257            false,
2258        );
2259
2260        assert!(!selection.is_empty());
2261    }
2262
2263    fn expected_metadata_with_reused_tag_name(
2264        old_metadata: &RegionMetadata,
2265    ) -> Arc<RegionMetadata> {
2266        let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2267        builder
2268            .push_column_metadata(ColumnMetadata {
2269                column_schema: ColumnSchema::new(
2270                    "tag_0".to_string(),
2271                    ConcreteDataType::string_datatype(),
2272                    true,
2273                ),
2274                semantic_type: SemanticType::Tag,
2275                column_id: 10,
2276            })
2277            .push_column_metadata(ColumnMetadata {
2278                column_schema: ColumnSchema::new(
2279                    "tag_1".to_string(),
2280                    ConcreteDataType::string_datatype(),
2281                    true,
2282                ),
2283                semantic_type: SemanticType::Tag,
2284                column_id: 1,
2285            })
2286            .push_column_metadata(ColumnMetadata {
2287                column_schema: ColumnSchema::new(
2288                    "field_0".to_string(),
2289                    ConcreteDataType::uint64_datatype(),
2290                    true,
2291                ),
2292                semantic_type: SemanticType::Field,
2293                column_id: 2,
2294            })
2295            .push_column_metadata(ColumnMetadata {
2296                column_schema: ColumnSchema::new(
2297                    "ts".to_string(),
2298                    ConcreteDataType::timestamp_millisecond_datatype(),
2299                    false,
2300                ),
2301                semantic_type: SemanticType::Timestamp,
2302                column_id: 3,
2303            })
2304            .primary_key(vec![10, 1]);
2305
2306        Arc::new(builder.build().unwrap())
2307    }
2308
2309    #[test]
2310    fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
2311        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2312        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2313        let ctx = SimpleFilterContext::new_opt(
2314            &metadata,
2315            Some(expected_metadata.as_ref()),
2316            &col("tag_0").eq(lit("a")),
2317        )
2318        .unwrap();
2319        assert!(matches!(
2320            ctx.filter(),
2321            MaybeFilter::Matched | MaybeFilter::Pruned
2322        ));
2323    }
2324
2325    #[test]
2326    fn test_physical_filter_context_skips_renamed_column() {
2327        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2328        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2329        let read_format = FlatReadFormat::new(
2330            metadata.clone(),
2331            ReadColumns::from_deduped_column_ids(
2332                metadata.column_metadatas.iter().map(|c| c.column_id),
2333            ),
2334            None,
2335            "test",
2336            true,
2337        )
2338        .unwrap();
2339
2340        let ctx = PhysicalFilterContext::new_opt(
2341            &metadata,
2342            Some(expected_metadata.as_ref()),
2343            &read_format,
2344            &col("tag_0").in_list(vec![lit("a"), lit("b")], false),
2345        );
2346
2347        assert!(ctx.is_none());
2348    }
2349
2350    #[test]
2351    fn test_physical_filter_context_only_accepts_prefilter_candidates() {
2352        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2353        let read_format = FlatReadFormat::new(
2354            metadata.clone(),
2355            ReadColumns::from_deduped_column_ids(
2356                metadata.column_metadatas.iter().map(|c| c.column_id),
2357            ),
2358            None,
2359            "test",
2360            true,
2361        )
2362        .unwrap();
2363
2364        // InList is on the allowlist — should build a context.
2365        let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
2366        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
2367
2368        // NOT IN uses the same variant with `negated: true` — also accepted.
2369        let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
2370        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &not_in).is_some());
2371
2372        // IS NULL / IS NOT NULL are accepted.
2373        let is_null = col("tag_0").is_null();
2374        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
2375        let is_not_null = col("tag_0").is_not_null();
2376        assert!(
2377            PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
2378        );
2379
2380        // BETWEEN is accepted.
2381        let between = col("field_0").between(lit(1_u64), lit(10_u64));
2382        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
2383
2384        // Binary expr is handled by SimpleFilterEvaluator — rejected here.
2385        let binary = col("tag_0").eq(lit("a"));
2386        assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
2387    }
2388}