mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{tracing, warn};
26use datafusion_expr::Expr;
27use datatypes::arrow::array::ArrayRef;
28use datatypes::arrow::datatypes::Field;
29use datatypes::arrow::error::ArrowError;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::data_type::ConcreteDataType;
32use datatypes::prelude::DataType;
33use mito_codec::row_converter::build_primary_key_codec;
34use object_store::ObjectStore;
35use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
36use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
37use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
38use partition::expr::PartitionExpr;
39use snafu::ResultExt;
40use store_api::codec::PrimaryKeyEncoding;
41use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
42use store_api::region_request::PathType;
43use store_api::storage::{ColumnId, FileId};
44use table::predicate::Predicate;
45
46use crate::cache::index::result_cache::PredicateKey;
47use crate::cache::{CacheStrategy, CachedSstMeta};
48#[cfg(feature = "vector_index")]
49use crate::error::ApplyVectorIndexSnafu;
50use crate::error::{
51    ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu,
52};
53use crate::metrics::{
54    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
55    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
56};
57use crate::read::flat_projection::CompactionProjectionMapper;
58use crate::read::prune::FlatPruneReader;
59use crate::read::{Batch, BatchReader};
60use crate::sst::file::FileHandle;
61use crate::sst::index::bloom_filter::applier::{
62    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
63};
64use crate::sst::index::fulltext_index::applier::{
65    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
66};
67use crate::sst::index::inverted_index::applier::{
68    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
69};
70#[cfg(feature = "vector_index")]
71use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
72use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
73use crate::sst::parquet::file_range::{
74    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
75    row_group_contains_delete,
76};
77use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
78use crate::sst::parquet::metadata::MetadataLoader;
79use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
80use crate::sst::parquet::row_selection::RowGroupSelection;
81use crate::sst::parquet::stats::RowGroupPruningStats;
82use crate::sst::tag_maybe_to_dictionary_field;
83
84const INDEX_TYPE_FULLTEXT: &str = "fulltext";
85const INDEX_TYPE_INVERTED: &str = "inverted";
86const INDEX_TYPE_BLOOM: &str = "bloom filter";
87const INDEX_TYPE_VECTOR: &str = "vector";
88
89macro_rules! handle_index_error {
90    ($err:expr, $file_handle:expr, $index_type:expr) => {
91        if cfg!(any(test, feature = "test")) {
92            panic!(
93                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
94                $index_type,
95                $file_handle.region_id(),
96                $file_handle.file_id(),
97                $err
98            );
99        } else {
100            warn!(
101                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
102                $index_type,
103                $file_handle.region_id(),
104                $file_handle.file_id()
105            );
106        }
107    };
108}
109
110/// Parquet SST reader builder.
111pub struct ParquetReaderBuilder {
112    /// SST directory.
113    table_dir: String,
114    /// Path type for generating file paths.
115    path_type: PathType,
116    file_handle: FileHandle,
117    object_store: ObjectStore,
118    /// Predicate to push down.
119    predicate: Option<Predicate>,
120    /// Metadata of columns to read.
121    ///
122    /// `None` reads all columns. Due to schema change, the projection
123    /// can contain columns not in the parquet file.
124    projection: Option<Vec<ColumnId>>,
125    /// Strategy to cache SST data.
126    cache_strategy: CacheStrategy,
127    /// Index appliers.
128    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
129    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
130    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
131    /// Vector index applier for KNN search.
132    #[cfg(feature = "vector_index")]
133    vector_index_applier: Option<VectorIndexApplierRef>,
134    /// Over-fetched k for vector index scan.
135    #[cfg(feature = "vector_index")]
136    vector_index_k: Option<usize>,
137    /// Expected metadata of the region while reading the SST.
138    /// This is usually the latest metadata of the region. The reader use
139    /// it get the correct column id of a column by name.
140    expected_metadata: Option<RegionMetadataRef>,
141    /// Whether to use flat format for reading.
142    flat_format: bool,
143    /// Whether this reader is for compaction.
144    compaction: bool,
145    /// Mode to pre-filter columns.
146    pre_filter_mode: PreFilterMode,
147    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
148    decode_primary_key_values: bool,
149    page_index_policy: PageIndexPolicy,
150}
151
152impl ParquetReaderBuilder {
153    /// Returns a new [ParquetReaderBuilder] to read specific SST.
154    pub fn new(
155        table_dir: String,
156        path_type: PathType,
157        file_handle: FileHandle,
158        object_store: ObjectStore,
159    ) -> ParquetReaderBuilder {
160        ParquetReaderBuilder {
161            table_dir,
162            path_type,
163            file_handle,
164            object_store,
165            predicate: None,
166            projection: None,
167            cache_strategy: CacheStrategy::Disabled,
168            inverted_index_appliers: [None, None],
169            bloom_filter_index_appliers: [None, None],
170            fulltext_index_appliers: [None, None],
171            #[cfg(feature = "vector_index")]
172            vector_index_applier: None,
173            #[cfg(feature = "vector_index")]
174            vector_index_k: None,
175            expected_metadata: None,
176            flat_format: false,
177            compaction: false,
178            pre_filter_mode: PreFilterMode::All,
179            decode_primary_key_values: false,
180            page_index_policy: Default::default(),
181        }
182    }
183
184    /// Attaches the predicate to the builder.
185    #[must_use]
186    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
187        self.predicate = predicate;
188        self
189    }
190
191    /// Attaches the projection to the builder.
192    ///
193    /// The reader only applies the projection to fields.
194    #[must_use]
195    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
196        self.projection = projection;
197        self
198    }
199
200    /// Attaches the cache to the builder.
201    #[must_use]
202    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
203        self.cache_strategy = cache;
204        self
205    }
206
207    /// Attaches the inverted index appliers to the builder.
208    #[must_use]
209    pub(crate) fn inverted_index_appliers(
210        mut self,
211        index_appliers: [Option<InvertedIndexApplierRef>; 2],
212    ) -> Self {
213        self.inverted_index_appliers = index_appliers;
214        self
215    }
216
217    /// Attaches the bloom filter index appliers to the builder.
218    #[must_use]
219    pub(crate) fn bloom_filter_index_appliers(
220        mut self,
221        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
222    ) -> Self {
223        self.bloom_filter_index_appliers = index_appliers;
224        self
225    }
226
227    /// Attaches the fulltext index appliers to the builder.
228    #[must_use]
229    pub(crate) fn fulltext_index_appliers(
230        mut self,
231        index_appliers: [Option<FulltextIndexApplierRef>; 2],
232    ) -> Self {
233        self.fulltext_index_appliers = index_appliers;
234        self
235    }
236
237    /// Attaches the vector index applier to the builder.
238    #[cfg(feature = "vector_index")]
239    #[must_use]
240    pub(crate) fn vector_index_applier(
241        mut self,
242        applier: Option<VectorIndexApplierRef>,
243        k: Option<usize>,
244    ) -> Self {
245        self.vector_index_applier = applier;
246        self.vector_index_k = k;
247        self
248    }
249
250    /// Attaches the expected metadata to the builder.
251    #[must_use]
252    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
253        self.expected_metadata = expected_metadata;
254        self
255    }
256
257    /// Sets the flat format flag.
258    #[must_use]
259    pub fn flat_format(mut self, flat_format: bool) -> Self {
260        self.flat_format = flat_format;
261        self
262    }
263
264    /// Sets the compaction flag.
265    #[must_use]
266    pub fn compaction(mut self, compaction: bool) -> Self {
267        self.compaction = compaction;
268        self
269    }
270
271    /// Sets the pre-filter mode.
272    #[must_use]
273    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
274        self.pre_filter_mode = pre_filter_mode;
275        self
276    }
277
278    /// Decodes primary key values eagerly when reading primary key format SSTs.
279    #[must_use]
280    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
281        self.decode_primary_key_values = decode;
282        self
283    }
284
285    #[must_use]
286    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
287        self.page_index_policy = page_index_policy;
288        self
289    }
290
291    /// Builds a [ParquetReader].
292    ///
293    /// This needs to perform IO operation.
294    #[tracing::instrument(
295        skip_all,
296        fields(
297            region_id = %self.file_handle.region_id(),
298            file_id = %self.file_handle.file_id()
299        )
300    )]
301    pub async fn build(&self) -> Result<Option<ParquetReader>> {
302        let mut metrics = ReaderMetrics::default();
303
304        let Some((context, selection)) = self.build_reader_input_inner(&mut metrics, true).await?
305        else {
306            return Ok(None);
307        };
308        ParquetReader::new(Arc::new(context), selection)
309            .await
310            .map(Some)
311    }
312
313    /// Builds a [FileRangeContext] and collects row groups to read.
314    ///
315    /// This needs to perform IO operation.
316    #[tracing::instrument(
317        skip_all,
318        fields(
319            region_id = %self.file_handle.region_id(),
320            file_id = %self.file_handle.file_id()
321        )
322    )]
323    pub(crate) async fn build_reader_input(
324        &self,
325        metrics: &mut ReaderMetrics,
326    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
327        self.build_reader_input_inner(metrics, self.flat_format)
328            .await
329    }
330
331    async fn build_reader_input_inner(
332        &self,
333        metrics: &mut ReaderMetrics,
334        flat_format: bool,
335    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
336        let start = Instant::now();
337
338        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
339        let file_size = self.file_handle.meta_ref().file_size;
340
341        // Loads parquet metadata of the file.
342        let (sst_meta, cache_miss) = self
343            .read_parquet_metadata(
344                &file_path,
345                file_size,
346                &mut metrics.metadata_cache_metrics,
347                self.page_index_policy,
348            )
349            .await?;
350        let parquet_meta = sst_meta.parquet_metadata();
351        let region_meta = sst_meta.region_metadata();
352        let region_partition_expr_str = self
353            .expected_metadata
354            .as_ref()
355            .and_then(|meta| meta.partition_expr.as_ref())
356            .map(|expr| expr.as_str());
357        let (_, is_same_region_partition) = Self::is_same_region_partition(
358            region_partition_expr_str,
359            self.file_handle.meta_ref().partition_expr.as_ref(),
360        )?;
361        // Skip auto convert when:
362        // - compaction is enabled
363        // - region partition expr is same with file partition expr (no need to auto convert)
364        let skip_auto_convert = self.compaction && is_same_region_partition;
365
366        // Build a compaction projection helper when:
367        // - compaction is enabled
368        // - region partition expr differs from file partition expr
369        // - flat format is enabled
370        // - primary key encoding is sparse
371        //
372        // This is applied after row-group filtering to align batches with flat output schema
373        // before compat handling.
374        let compaction_projection_mapper = if self.compaction
375            && !is_same_region_partition
376            && flat_format
377            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
378        {
379            Some(CompactionProjectionMapper::try_new(&region_meta)?)
380        } else {
381            None
382        };
383
384        let mut read_format = if let Some(column_ids) = &self.projection {
385            ReadFormat::new(
386                region_meta.clone(),
387                Some(column_ids),
388                flat_format,
389                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
390                &file_path,
391                skip_auto_convert,
392            )?
393        } else {
394            // Lists all column ids to read, we always use the expected metadata if possible.
395            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
396            let column_ids: Vec<_> = expected_meta
397                .column_metadatas
398                .iter()
399                .map(|col| col.column_id)
400                .collect();
401            ReadFormat::new(
402                region_meta.clone(),
403                Some(&column_ids),
404                flat_format,
405                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
406                &file_path,
407                skip_auto_convert,
408            )?
409        };
410        if self.decode_primary_key_values {
411            read_format.set_decode_primary_key_values(true);
412        }
413        if need_override_sequence(&parquet_meta) {
414            read_format
415                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
416        }
417
418        let selection = self
419            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
420            .await;
421
422        if selection.is_empty() {
423            metrics.build_cost += start.elapsed();
424            return Ok(None);
425        }
426
427        // Trigger background download if metadata had a cache miss and selection is not empty
428        if cache_miss && !selection.is_empty() {
429            use crate::cache::file_cache::{FileType, IndexKey};
430            let index_key = IndexKey::new(
431                self.file_handle.region_id(),
432                self.file_handle.file_id().file_id(),
433                FileType::Parquet,
434            );
435            self.cache_strategy.maybe_download_background(
436                index_key,
437                file_path.clone(),
438                self.object_store.clone(),
439                file_size,
440            );
441        }
442
443        let prune_schema = self
444            .expected_metadata
445            .as_ref()
446            .map(|meta| meta.schema.clone())
447            .unwrap_or_else(|| region_meta.schema.clone());
448
449        // Computes the projection mask.
450        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
451        let indices = read_format.projection_indices();
452        // Now we assumes we don't have nested schemas.
453        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
454        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
455
456        // Computes the field levels.
457        let hint = Some(read_format.arrow_schema().fields());
458        let field_levels =
459            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
460                .context(ReadDataPartSnafu)?;
461
462        let reader_builder = RowGroupReaderBuilder {
463            file_handle: self.file_handle.clone(),
464            file_path,
465            parquet_meta,
466            object_store: self.object_store.clone(),
467            projection: projection_mask,
468            field_levels,
469            cache_strategy: self.cache_strategy.clone(),
470        };
471
472        let filters = if let Some(predicate) = &self.predicate {
473            predicate
474                .exprs()
475                .iter()
476                .filter_map(|expr| {
477                    SimpleFilterContext::new_opt(
478                        &region_meta,
479                        self.expected_metadata.as_deref(),
480                        expr,
481                    )
482                })
483                .collect::<Vec<_>>()
484        } else {
485            vec![]
486        };
487
488        let dyn_filters = if let Some(predicate) = &self.predicate {
489            predicate.dyn_filters().as_ref().clone()
490        } else {
491            vec![]
492        };
493
494        let codec = build_primary_key_codec(read_format.metadata());
495
496        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
497
498        let context = FileRangeContext::new(
499            reader_builder,
500            RangeBase {
501                filters,
502                dyn_filters,
503                read_format,
504                expected_metadata: self.expected_metadata.clone(),
505                prune_schema,
506                codec,
507                compat_batch: None,
508                compaction_projection_mapper,
509                pre_filter_mode: self.pre_filter_mode,
510                partition_filter,
511            },
512        );
513
514        metrics.build_cost += start.elapsed();
515
516        Ok(Some((context, selection)))
517    }
518
519    fn is_same_region_partition(
520        region_partition_expr_str: Option<&str>,
521        file_partition_expr: Option<&PartitionExpr>,
522    ) -> Result<(Option<PartitionExpr>, bool)> {
523        let region_partition_expr = match region_partition_expr_str {
524            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
525            None => None,
526        };
527
528        let is_same = region_partition_expr.as_ref() == file_partition_expr;
529        Ok((region_partition_expr, is_same))
530    }
531
532    /// Compare partition expressions from expected metadata and file metadata,
533    /// and build a partition filter if they differ.
534    fn build_partition_filter(
535        &self,
536        read_format: &ReadFormat,
537        prune_schema: &Arc<datatypes::schema::Schema>,
538    ) -> Result<Option<PartitionFilterContext>> {
539        let region_partition_expr_str = self
540            .expected_metadata
541            .as_ref()
542            .and_then(|meta| meta.partition_expr.as_ref());
543        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
544
545        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
546            region_partition_expr_str.map(|s| s.as_str()),
547            file_partition_expr_ref,
548        )?;
549
550        if is_same_region_partition {
551            return Ok(None);
552        }
553
554        let Some(region_partition_expr) = region_partition_expr else {
555            return Ok(None);
556        };
557
558        // Collect columns referenced by the partition expression.
559        let mut referenced_columns = HashSet::new();
560        region_partition_expr.collect_column_names(&mut referenced_columns);
561
562        // Build a partition_schema containing only referenced columns.
563        let is_flat = read_format.as_flat().is_some();
564        let partition_schema = Arc::new(datatypes::schema::Schema::new(
565            prune_schema
566                .column_schemas()
567                .iter()
568                .filter(|col| referenced_columns.contains(&col.name))
569                .map(|col| {
570                    if is_flat
571                        && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
572                        && column_meta.semantic_type == SemanticType::Tag
573                        && col.data_type.is_string()
574                    {
575                        let field = Arc::new(Field::new(
576                            &col.name,
577                            col.data_type.as_arrow_type(),
578                            col.is_nullable(),
579                        ));
580                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
581                        let mut column = col.clone();
582                        column.data_type =
583                            ConcreteDataType::from_arrow_type(dict_field.data_type());
584                        return column;
585                    }
586
587                    col.clone()
588                })
589                .collect::<Vec<_>>(),
590        ));
591
592        let region_partition_physical_expr = region_partition_expr
593            .try_as_physical_expr(partition_schema.arrow_schema())
594            .context(SerializePartitionExprSnafu)?;
595
596        Ok(Some(PartitionFilterContext {
597            region_partition_physical_expr,
598            partition_schema,
599        }))
600    }
601
602    /// Reads parquet metadata of specific file.
603    /// Returns (fused metadata, cache_miss_flag).
604    async fn read_parquet_metadata(
605        &self,
606        file_path: &str,
607        file_size: u64,
608        cache_metrics: &mut MetadataCacheMetrics,
609        page_index_policy: PageIndexPolicy,
610    ) -> Result<(Arc<CachedSstMeta>, bool)> {
611        let start = Instant::now();
612        let _t = READ_STAGE_ELAPSED
613            .with_label_values(&["read_parquet_metadata"])
614            .start_timer();
615
616        let file_id = self.file_handle.file_id();
617        // Tries to get from cache with metrics tracking.
618        if let Some(metadata) = self
619            .cache_strategy
620            .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
621            .await
622        {
623            cache_metrics.metadata_load_cost += start.elapsed();
624            return Ok((metadata, false));
625        }
626
627        // Cache miss, load metadata directly.
628        let mut metadata_loader =
629            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
630        metadata_loader.with_page_index_policy(page_index_policy);
631        let metadata = metadata_loader.load(cache_metrics).await?;
632
633        let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
634        // Cache the metadata.
635        self.cache_strategy
636            .put_sst_meta_data(file_id, metadata.clone());
637
638        cache_metrics.metadata_load_cost += start.elapsed();
639        Ok((metadata, true))
640    }
641
642    /// Computes row groups to read, along with their respective row selections.
643    #[tracing::instrument(
644        skip_all,
645        fields(
646            region_id = %self.file_handle.region_id(),
647            file_id = %self.file_handle.file_id()
648        )
649    )]
650    async fn row_groups_to_read(
651        &self,
652        read_format: &ReadFormat,
653        parquet_meta: &ParquetMetaData,
654        metrics: &mut ReaderFilterMetrics,
655    ) -> RowGroupSelection {
656        let num_row_groups = parquet_meta.num_row_groups();
657        let num_rows = parquet_meta.file_metadata().num_rows();
658        if num_row_groups == 0 || num_rows == 0 {
659            return RowGroupSelection::default();
660        }
661
662        // Let's assume that the number of rows in the first row group
663        // can represent the `row_group_size` of the Parquet file.
664        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
665        if row_group_size == 0 {
666            return RowGroupSelection::default();
667        }
668
669        metrics.rg_total += num_row_groups;
670        metrics.rows_total += num_rows as usize;
671
672        // Compute skip_fields once for all pruning operations
673        let skip_fields = self.compute_skip_fields(parquet_meta);
674
675        let mut output = self.row_groups_by_minmax(
676            read_format,
677            parquet_meta,
678            row_group_size,
679            num_rows as usize,
680            metrics,
681            skip_fields,
682        );
683        if output.is_empty() {
684            return output;
685        }
686
687        let fulltext_filtered = self
688            .prune_row_groups_by_fulltext_index(
689                row_group_size,
690                num_row_groups,
691                &mut output,
692                metrics,
693                skip_fields,
694            )
695            .await;
696        if output.is_empty() {
697            return output;
698        }
699
700        self.prune_row_groups_by_inverted_index(
701            row_group_size,
702            num_row_groups,
703            &mut output,
704            metrics,
705            skip_fields,
706        )
707        .await;
708        if output.is_empty() {
709            return output;
710        }
711
712        self.prune_row_groups_by_bloom_filter(
713            row_group_size,
714            parquet_meta,
715            &mut output,
716            metrics,
717            skip_fields,
718        )
719        .await;
720        if output.is_empty() {
721            return output;
722        }
723
724        if !fulltext_filtered {
725            self.prune_row_groups_by_fulltext_bloom(
726                row_group_size,
727                parquet_meta,
728                &mut output,
729                metrics,
730                skip_fields,
731            )
732            .await;
733        }
734        #[cfg(feature = "vector_index")]
735        {
736            self.prune_row_groups_by_vector_index(
737                row_group_size,
738                num_row_groups,
739                &mut output,
740                metrics,
741            )
742            .await;
743            if output.is_empty() {
744                return output;
745            }
746        }
747        output
748    }
749
750    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
751    async fn prune_row_groups_by_fulltext_index(
752        &self,
753        row_group_size: usize,
754        num_row_groups: usize,
755        output: &mut RowGroupSelection,
756        metrics: &mut ReaderFilterMetrics,
757        skip_fields: bool,
758    ) -> bool {
759        if !self.file_handle.meta_ref().fulltext_index_available() {
760            return false;
761        }
762
763        let mut pruned = false;
764        // If skip_fields is true, only apply the first applier (for tags).
765        let appliers = if skip_fields {
766            &self.fulltext_index_appliers[..1]
767        } else {
768            &self.fulltext_index_appliers[..]
769        };
770        for index_applier in appliers.iter().flatten() {
771            let predicate_key = index_applier.predicate_key();
772            // Fast path: return early if the result is in the cache.
773            let cached = self
774                .cache_strategy
775                .index_result_cache()
776                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
777            if let Some(result) = cached.as_ref()
778                && all_required_row_groups_searched(output, result)
779            {
780                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
781                metrics.fulltext_index_cache_hit += 1;
782                pruned = true;
783                continue;
784            }
785
786            // Slow path: apply the index from the file.
787            metrics.fulltext_index_cache_miss += 1;
788            let file_size_hint = self.file_handle.meta_ref().index_file_size();
789            let apply_res = index_applier
790                .apply_fine(
791                    self.file_handle.index_id(),
792                    Some(file_size_hint),
793                    metrics.fulltext_index_apply_metrics.as_mut(),
794                )
795                .await;
796            let selection = match apply_res {
797                Ok(Some(res)) => {
798                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
799                }
800                Ok(None) => continue,
801                Err(err) => {
802                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
803                    continue;
804                }
805            };
806
807            self.apply_index_result_and_update_cache(
808                predicate_key,
809                self.file_handle.file_id().file_id(),
810                selection,
811                output,
812                metrics,
813                INDEX_TYPE_FULLTEXT,
814            );
815            pruned = true;
816        }
817        pruned
818    }
819
820    /// Applies index to prune row groups.
821    ///
822    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
823    /// as an escape route in case of index issues, and it can be used to test
824    /// the correctness of the index.
825    async fn prune_row_groups_by_inverted_index(
826        &self,
827        row_group_size: usize,
828        num_row_groups: usize,
829        output: &mut RowGroupSelection,
830        metrics: &mut ReaderFilterMetrics,
831        skip_fields: bool,
832    ) -> bool {
833        if !self.file_handle.meta_ref().inverted_index_available() {
834            return false;
835        }
836
837        let mut pruned = false;
838        // If skip_fields is true, only apply the first applier (for tags).
839        let appliers = if skip_fields {
840            &self.inverted_index_appliers[..1]
841        } else {
842            &self.inverted_index_appliers[..]
843        };
844        for index_applier in appliers.iter().flatten() {
845            let predicate_key = index_applier.predicate_key();
846            // Fast path: return early if the result is in the cache.
847            let cached = self
848                .cache_strategy
849                .index_result_cache()
850                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
851            if let Some(result) = cached.as_ref()
852                && all_required_row_groups_searched(output, result)
853            {
854                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
855                metrics.inverted_index_cache_hit += 1;
856                pruned = true;
857                continue;
858            }
859
860            // Slow path: apply the index from the file.
861            metrics.inverted_index_cache_miss += 1;
862            let file_size_hint = self.file_handle.meta_ref().index_file_size();
863            let apply_res = index_applier
864                .apply(
865                    self.file_handle.index_id(),
866                    Some(file_size_hint),
867                    metrics.inverted_index_apply_metrics.as_mut(),
868                )
869                .await;
870            let selection = match apply_res {
871                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
872                    row_group_size,
873                    num_row_groups,
874                    apply_output,
875                ),
876                Err(err) => {
877                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
878                    continue;
879                }
880            };
881
882            self.apply_index_result_and_update_cache(
883                predicate_key,
884                self.file_handle.file_id().file_id(),
885                selection,
886                output,
887                metrics,
888                INDEX_TYPE_INVERTED,
889            );
890            pruned = true;
891        }
892        pruned
893    }
894
895    async fn prune_row_groups_by_bloom_filter(
896        &self,
897        row_group_size: usize,
898        parquet_meta: &ParquetMetaData,
899        output: &mut RowGroupSelection,
900        metrics: &mut ReaderFilterMetrics,
901        skip_fields: bool,
902    ) -> bool {
903        if !self.file_handle.meta_ref().bloom_filter_index_available() {
904            return false;
905        }
906
907        let mut pruned = false;
908        // If skip_fields is true, only apply the first applier (for tags).
909        let appliers = if skip_fields {
910            &self.bloom_filter_index_appliers[..1]
911        } else {
912            &self.bloom_filter_index_appliers[..]
913        };
914        for index_applier in appliers.iter().flatten() {
915            let predicate_key = index_applier.predicate_key();
916            // Fast path: return early if the result is in the cache.
917            let cached = self
918                .cache_strategy
919                .index_result_cache()
920                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
921            if let Some(result) = cached.as_ref()
922                && all_required_row_groups_searched(output, result)
923            {
924                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
925                metrics.bloom_filter_cache_hit += 1;
926                pruned = true;
927                continue;
928            }
929
930            // Slow path: apply the index from the file.
931            metrics.bloom_filter_cache_miss += 1;
932            let file_size_hint = self.file_handle.meta_ref().index_file_size();
933            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
934                (
935                    rg.num_rows() as usize,
936                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
937                    output.contains_non_empty_row_group(i)
938                        && cached
939                            .as_ref()
940                            .map(|c| !c.contains_row_group(i))
941                            .unwrap_or(true),
942                )
943            });
944            let apply_res = index_applier
945                .apply(
946                    self.file_handle.index_id(),
947                    Some(file_size_hint),
948                    rgs,
949                    metrics.bloom_filter_apply_metrics.as_mut(),
950                )
951                .await;
952            let mut selection = match apply_res {
953                Ok(apply_output) => {
954                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
955                }
956                Err(err) => {
957                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
958                    continue;
959                }
960            };
961
962            // New searched row groups are added to `selection`, concat them with `cached`.
963            if let Some(cached) = cached.as_ref() {
964                selection.concat(cached);
965            }
966
967            self.apply_index_result_and_update_cache(
968                predicate_key,
969                self.file_handle.file_id().file_id(),
970                selection,
971                output,
972                metrics,
973                INDEX_TYPE_BLOOM,
974            );
975            pruned = true;
976        }
977        pruned
978    }
979
980    /// Prunes row groups by vector index results.
981    #[cfg(feature = "vector_index")]
982    async fn prune_row_groups_by_vector_index(
983        &self,
984        row_group_size: usize,
985        num_row_groups: usize,
986        output: &mut RowGroupSelection,
987        metrics: &mut ReaderFilterMetrics,
988    ) {
989        let Some(applier) = &self.vector_index_applier else {
990            return;
991        };
992        let Some(k) = self.vector_index_k else {
993            return;
994        };
995        if !self.file_handle.meta_ref().vector_index_available() {
996            return;
997        }
998
999        let file_size_hint = self.file_handle.meta_ref().index_file_size();
1000        let apply_res = applier
1001            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1002            .await;
1003        let row_ids = match apply_res {
1004            Ok(res) => res.row_offsets,
1005            Err(err) => {
1006                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1007                return;
1008            }
1009        };
1010
1011        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1012        {
1013            Ok(selection) => selection,
1014            Err(err) => {
1015                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1016                return;
1017            }
1018        };
1019        metrics.rows_vector_selected += selection.row_count();
1020        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1021    }
1022
1023    async fn prune_row_groups_by_fulltext_bloom(
1024        &self,
1025        row_group_size: usize,
1026        parquet_meta: &ParquetMetaData,
1027        output: &mut RowGroupSelection,
1028        metrics: &mut ReaderFilterMetrics,
1029        skip_fields: bool,
1030    ) -> bool {
1031        if !self.file_handle.meta_ref().fulltext_index_available() {
1032            return false;
1033        }
1034
1035        let mut pruned = false;
1036        // If skip_fields is true, only apply the first applier (for tags).
1037        let appliers = if skip_fields {
1038            &self.fulltext_index_appliers[..1]
1039        } else {
1040            &self.fulltext_index_appliers[..]
1041        };
1042        for index_applier in appliers.iter().flatten() {
1043            let predicate_key = index_applier.predicate_key();
1044            // Fast path: return early if the result is in the cache.
1045            let cached = self
1046                .cache_strategy
1047                .index_result_cache()
1048                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1049            if let Some(result) = cached.as_ref()
1050                && all_required_row_groups_searched(output, result)
1051            {
1052                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1053                metrics.fulltext_index_cache_hit += 1;
1054                pruned = true;
1055                continue;
1056            }
1057
1058            // Slow path: apply the index from the file.
1059            metrics.fulltext_index_cache_miss += 1;
1060            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1061            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1062                (
1063                    rg.num_rows() as usize,
1064                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1065                    output.contains_non_empty_row_group(i)
1066                        && cached
1067                            .as_ref()
1068                            .map(|c| !c.contains_row_group(i))
1069                            .unwrap_or(true),
1070                )
1071            });
1072            let apply_res = index_applier
1073                .apply_coarse(
1074                    self.file_handle.index_id(),
1075                    Some(file_size_hint),
1076                    rgs,
1077                    metrics.fulltext_index_apply_metrics.as_mut(),
1078                )
1079                .await;
1080            let mut selection = match apply_res {
1081                Ok(Some(apply_output)) => {
1082                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1083                }
1084                Ok(None) => continue,
1085                Err(err) => {
1086                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1087                    continue;
1088                }
1089            };
1090
1091            // New searched row groups are added to `selection`, concat them with `cached`.
1092            if let Some(cached) = cached.as_ref() {
1093                selection.concat(cached);
1094            }
1095
1096            self.apply_index_result_and_update_cache(
1097                predicate_key,
1098                self.file_handle.file_id().file_id(),
1099                selection,
1100                output,
1101                metrics,
1102                INDEX_TYPE_FULLTEXT,
1103            );
1104            pruned = true;
1105        }
1106        pruned
1107    }
1108
1109    /// Computes whether to skip field columns when building statistics based on PreFilterMode.
1110    fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
1111        match self.pre_filter_mode {
1112            PreFilterMode::All => false,
1113            PreFilterMode::SkipFields => true,
1114            PreFilterMode::SkipFieldsOnDelete => {
1115                // Check if any row group contains delete op
1116                let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
1117                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1118                    row_group_contains_delete(parquet_meta, rg_idx, &file_path)
1119                        .inspect_err(|e| {
1120                            warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
1121                        })
1122                        .unwrap_or(false)
1123                })
1124            }
1125        }
1126    }
1127
1128    /// Computes row groups selection after min-max pruning.
1129    fn row_groups_by_minmax(
1130        &self,
1131        read_format: &ReadFormat,
1132        parquet_meta: &ParquetMetaData,
1133        row_group_size: usize,
1134        total_row_count: usize,
1135        metrics: &mut ReaderFilterMetrics,
1136        skip_fields: bool,
1137    ) -> RowGroupSelection {
1138        let Some(predicate) = &self.predicate else {
1139            return RowGroupSelection::new(row_group_size, total_row_count);
1140        };
1141
1142        let file_id = self.file_handle.file_id().file_id();
1143        let index_result_cache = self.cache_strategy.index_result_cache();
1144        let cached_minmax_key =
1145            if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1146                // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
1147                // building row-group pruning stats for identical predicates across queries.
1148                let mut exprs = predicate
1149                    .exprs()
1150                    .iter()
1151                    .map(|expr| format!("{expr:?}"))
1152                    .collect::<Vec<_>>();
1153                exprs.sort();
1154                let schema_version = self
1155                    .expected_metadata
1156                    .as_ref()
1157                    .map(|meta| meta.schema_version)
1158                    .unwrap_or_else(|| read_format.metadata().schema_version);
1159                Some(PredicateKey::new_minmax(
1160                    Arc::new(exprs),
1161                    schema_version,
1162                    skip_fields,
1163                ))
1164            } else {
1165                None
1166            };
1167
1168        if let Some(index_result_cache) = index_result_cache
1169            && let Some(predicate_key) = cached_minmax_key.as_ref()
1170        {
1171            if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1172                metrics.minmax_cache_hit += 1;
1173                let num_row_groups = parquet_meta.num_row_groups();
1174                metrics.rg_minmax_filtered +=
1175                    num_row_groups.saturating_sub(result.row_group_count());
1176                return (*result).clone();
1177            }
1178
1179            metrics.minmax_cache_miss += 1;
1180        }
1181
1182        let region_meta = read_format.metadata();
1183        let row_groups = parquet_meta.row_groups();
1184        let stats = RowGroupPruningStats::new(
1185            row_groups,
1186            read_format,
1187            self.expected_metadata.clone(),
1188            skip_fields,
1189        );
1190        let prune_schema = self
1191            .expected_metadata
1192            .as_ref()
1193            .map(|meta| meta.schema.arrow_schema())
1194            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1195
1196        // Here we use the schema of the SST to build the physical expression. If the column
1197        // in the SST doesn't have the same column id as the column in the expected metadata,
1198        // we will get a None statistics for that column.
1199        let mask = predicate.prune_with_stats(&stats, prune_schema);
1200        let output = RowGroupSelection::from_full_row_group_ids(
1201            mask.iter()
1202                .enumerate()
1203                .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1204            row_group_size,
1205            total_row_count,
1206        );
1207
1208        metrics.rg_minmax_filtered += parquet_meta
1209            .num_row_groups()
1210            .saturating_sub(output.row_group_count());
1211
1212        if let Some(index_result_cache) = index_result_cache
1213            && let Some(predicate_key) = cached_minmax_key
1214        {
1215            index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1216        }
1217
1218        output
1219    }
1220
1221    fn apply_index_result_and_update_cache(
1222        &self,
1223        predicate_key: &PredicateKey,
1224        file_id: FileId,
1225        result: RowGroupSelection,
1226        output: &mut RowGroupSelection,
1227        metrics: &mut ReaderFilterMetrics,
1228        index_type: &str,
1229    ) {
1230        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1231
1232        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1233            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1234        }
1235    }
1236}
1237fn apply_selection_and_update_metrics(
1238    output: &mut RowGroupSelection,
1239    result: &RowGroupSelection,
1240    metrics: &mut ReaderFilterMetrics,
1241    index_type: &str,
1242) {
1243    let intersection = output.intersect(result);
1244
1245    let row_group_count = output.row_group_count() - intersection.row_group_count();
1246    let row_count = output.row_count() - intersection.row_count();
1247
1248    metrics.update_index_metrics(index_type, row_group_count, row_count);
1249
1250    *output = intersection;
1251}
1252
1253#[cfg(feature = "vector_index")]
1254fn vector_selection_from_offsets(
1255    row_offsets: Vec<u64>,
1256    row_group_size: usize,
1257    num_row_groups: usize,
1258) -> Result<RowGroupSelection> {
1259    let mut row_ids = BTreeSet::new();
1260    for offset in row_offsets {
1261        let row_id = u32::try_from(offset).map_err(|_| {
1262            ApplyVectorIndexSnafu {
1263                reason: format!("Row offset {} exceeds u32::MAX", offset),
1264            }
1265            .build()
1266        })?;
1267        row_ids.insert(row_id);
1268    }
1269    Ok(RowGroupSelection::from_row_ids(
1270        row_ids,
1271        row_group_size,
1272        num_row_groups,
1273    ))
1274}
1275
1276fn all_required_row_groups_searched(
1277    required_row_groups: &RowGroupSelection,
1278    cached_row_groups: &RowGroupSelection,
1279) -> bool {
1280    required_row_groups.iter().all(|(rg_id, _)| {
1281        // Row group with no rows is not required to search.
1282        !required_row_groups.contains_non_empty_row_group(*rg_id)
1283            // The row group is already searched.
1284            || cached_row_groups.contains_row_group(*rg_id)
1285    })
1286}
1287
1288/// Metrics of filtering rows groups and rows.
1289#[derive(Debug, Default, Clone)]
1290pub(crate) struct ReaderFilterMetrics {
1291    /// Number of row groups before filtering.
1292    pub(crate) rg_total: usize,
1293    /// Number of row groups filtered by fulltext index.
1294    pub(crate) rg_fulltext_filtered: usize,
1295    /// Number of row groups filtered by inverted index.
1296    pub(crate) rg_inverted_filtered: usize,
1297    /// Number of row groups filtered by min-max index.
1298    pub(crate) rg_minmax_filtered: usize,
1299    /// Number of row groups filtered by bloom filter index.
1300    pub(crate) rg_bloom_filtered: usize,
1301    /// Number of row groups filtered by vector index.
1302    pub(crate) rg_vector_filtered: usize,
1303
1304    /// Number of rows in row group before filtering.
1305    pub(crate) rows_total: usize,
1306    /// Number of rows in row group filtered by fulltext index.
1307    pub(crate) rows_fulltext_filtered: usize,
1308    /// Number of rows in row group filtered by inverted index.
1309    pub(crate) rows_inverted_filtered: usize,
1310    /// Number of rows in row group filtered by bloom filter index.
1311    pub(crate) rows_bloom_filtered: usize,
1312    /// Number of rows filtered by vector index.
1313    pub(crate) rows_vector_filtered: usize,
1314    /// Number of rows selected by vector index.
1315    pub(crate) rows_vector_selected: usize,
1316    /// Number of rows filtered by precise filter.
1317    pub(crate) rows_precise_filtered: usize,
1318
1319    /// Number of index result cache hits for fulltext index.
1320    pub(crate) fulltext_index_cache_hit: usize,
1321    /// Number of index result cache misses for fulltext index.
1322    pub(crate) fulltext_index_cache_miss: usize,
1323    /// Number of index result cache hits for inverted index.
1324    pub(crate) inverted_index_cache_hit: usize,
1325    /// Number of index result cache misses for inverted index.
1326    pub(crate) inverted_index_cache_miss: usize,
1327    /// Number of index result cache hits for bloom filter index.
1328    pub(crate) bloom_filter_cache_hit: usize,
1329    /// Number of index result cache misses for bloom filter index.
1330    pub(crate) bloom_filter_cache_miss: usize,
1331    /// Number of index result cache hits for minmax pruning.
1332    pub(crate) minmax_cache_hit: usize,
1333    /// Number of index result cache misses for minmax pruning.
1334    pub(crate) minmax_cache_miss: usize,
1335
1336    /// Optional metrics for inverted index applier.
1337    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1338    /// Optional metrics for bloom filter index applier.
1339    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1340    /// Optional metrics for fulltext index applier.
1341    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1342
1343    /// Number of pruner builder cache hits.
1344    pub(crate) pruner_cache_hit: usize,
1345    /// Number of pruner builder cache misses.
1346    pub(crate) pruner_cache_miss: usize,
1347    /// Duration spent waiting for pruner to build file ranges.
1348    pub(crate) pruner_prune_cost: Duration,
1349}
1350
1351impl ReaderFilterMetrics {
1352    /// Adds `other` metrics to this metrics.
1353    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1354        self.rg_total += other.rg_total;
1355        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1356        self.rg_inverted_filtered += other.rg_inverted_filtered;
1357        self.rg_minmax_filtered += other.rg_minmax_filtered;
1358        self.rg_bloom_filtered += other.rg_bloom_filtered;
1359        self.rg_vector_filtered += other.rg_vector_filtered;
1360
1361        self.rows_total += other.rows_total;
1362        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1363        self.rows_inverted_filtered += other.rows_inverted_filtered;
1364        self.rows_bloom_filtered += other.rows_bloom_filtered;
1365        self.rows_vector_filtered += other.rows_vector_filtered;
1366        self.rows_vector_selected += other.rows_vector_selected;
1367        self.rows_precise_filtered += other.rows_precise_filtered;
1368
1369        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1370        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1371        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1372        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1373        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1374        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1375        self.minmax_cache_hit += other.minmax_cache_hit;
1376        self.minmax_cache_miss += other.minmax_cache_miss;
1377
1378        self.pruner_cache_hit += other.pruner_cache_hit;
1379        self.pruner_cache_miss += other.pruner_cache_miss;
1380        self.pruner_prune_cost += other.pruner_prune_cost;
1381
1382        // Merge optional applier metrics
1383        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1384            self.inverted_index_apply_metrics
1385                .get_or_insert_with(Default::default)
1386                .merge_from(other_metrics);
1387        }
1388        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1389            self.bloom_filter_apply_metrics
1390                .get_or_insert_with(Default::default)
1391                .merge_from(other_metrics);
1392        }
1393        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1394            self.fulltext_index_apply_metrics
1395                .get_or_insert_with(Default::default)
1396                .merge_from(other_metrics);
1397        }
1398    }
1399
1400    /// Reports metrics.
1401    pub(crate) fn observe(&self) {
1402        READ_ROW_GROUPS_TOTAL
1403            .with_label_values(&["before_filtering"])
1404            .inc_by(self.rg_total as u64);
1405        READ_ROW_GROUPS_TOTAL
1406            .with_label_values(&["fulltext_index_filtered"])
1407            .inc_by(self.rg_fulltext_filtered as u64);
1408        READ_ROW_GROUPS_TOTAL
1409            .with_label_values(&["inverted_index_filtered"])
1410            .inc_by(self.rg_inverted_filtered as u64);
1411        READ_ROW_GROUPS_TOTAL
1412            .with_label_values(&["minmax_index_filtered"])
1413            .inc_by(self.rg_minmax_filtered as u64);
1414        READ_ROW_GROUPS_TOTAL
1415            .with_label_values(&["bloom_filter_index_filtered"])
1416            .inc_by(self.rg_bloom_filtered as u64);
1417        READ_ROW_GROUPS_TOTAL
1418            .with_label_values(&["vector_index_filtered"])
1419            .inc_by(self.rg_vector_filtered as u64);
1420
1421        PRECISE_FILTER_ROWS_TOTAL
1422            .with_label_values(&["parquet"])
1423            .inc_by(self.rows_precise_filtered as u64);
1424        READ_ROWS_IN_ROW_GROUP_TOTAL
1425            .with_label_values(&["before_filtering"])
1426            .inc_by(self.rows_total as u64);
1427        READ_ROWS_IN_ROW_GROUP_TOTAL
1428            .with_label_values(&["fulltext_index_filtered"])
1429            .inc_by(self.rows_fulltext_filtered as u64);
1430        READ_ROWS_IN_ROW_GROUP_TOTAL
1431            .with_label_values(&["inverted_index_filtered"])
1432            .inc_by(self.rows_inverted_filtered as u64);
1433        READ_ROWS_IN_ROW_GROUP_TOTAL
1434            .with_label_values(&["bloom_filter_index_filtered"])
1435            .inc_by(self.rows_bloom_filtered as u64);
1436        READ_ROWS_IN_ROW_GROUP_TOTAL
1437            .with_label_values(&["vector_index_filtered"])
1438            .inc_by(self.rows_vector_filtered as u64);
1439    }
1440
1441    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1442        match index_type {
1443            INDEX_TYPE_FULLTEXT => {
1444                self.rg_fulltext_filtered += row_group_count;
1445                self.rows_fulltext_filtered += row_count;
1446            }
1447            INDEX_TYPE_INVERTED => {
1448                self.rg_inverted_filtered += row_group_count;
1449                self.rows_inverted_filtered += row_count;
1450            }
1451            INDEX_TYPE_BLOOM => {
1452                self.rg_bloom_filtered += row_group_count;
1453                self.rows_bloom_filtered += row_count;
1454            }
1455            INDEX_TYPE_VECTOR => {
1456                self.rg_vector_filtered += row_group_count;
1457                self.rows_vector_filtered += row_count;
1458            }
1459            _ => {}
1460        }
1461    }
1462}
1463
1464#[cfg(all(test, feature = "vector_index"))]
1465mod vector_index_tests {
1466    use super::*;
1467
1468    #[test]
1469    fn test_vector_selection_from_offsets() {
1470        let row_group_size = 4;
1471        let num_row_groups = 3;
1472        let selection =
1473            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1474                .unwrap();
1475
1476        assert_eq!(selection.row_group_count(), 3);
1477        assert_eq!(selection.row_count(), 4);
1478        assert!(selection.contains_non_empty_row_group(0));
1479        assert!(selection.contains_non_empty_row_group(1));
1480        assert!(selection.contains_non_empty_row_group(2));
1481    }
1482
1483    #[test]
1484    fn test_vector_selection_from_offsets_out_of_range() {
1485        let row_group_size = 4;
1486        let num_row_groups = 2;
1487        let selection = vector_selection_from_offsets(
1488            vec![0, 7, u64::from(u32::MAX) + 1],
1489            row_group_size,
1490            num_row_groups,
1491        );
1492        assert!(selection.is_err());
1493    }
1494
1495    #[test]
1496    fn test_vector_selection_updates_metrics() {
1497        let row_group_size = 4;
1498        let total_rows = 8;
1499        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1500        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1501        let mut metrics = ReaderFilterMetrics::default();
1502
1503        apply_selection_and_update_metrics(
1504            &mut output,
1505            &selection,
1506            &mut metrics,
1507            INDEX_TYPE_VECTOR,
1508        );
1509
1510        assert_eq!(metrics.rg_vector_filtered, 1);
1511        assert_eq!(metrics.rows_vector_filtered, 7);
1512        assert_eq!(output.row_count(), 1);
1513    }
1514}
1515
1516/// Metrics for parquet metadata cache operations.
1517#[derive(Default, Clone, Copy)]
1518pub(crate) struct MetadataCacheMetrics {
1519    /// Number of memory cache hits for parquet metadata.
1520    pub(crate) mem_cache_hit: usize,
1521    /// Number of file cache hits for parquet metadata.
1522    pub(crate) file_cache_hit: usize,
1523    /// Number of cache misses for parquet metadata.
1524    pub(crate) cache_miss: usize,
1525    /// Duration to load parquet metadata.
1526    pub(crate) metadata_load_cost: Duration,
1527    /// Number of read operations performed.
1528    pub(crate) num_reads: usize,
1529    /// Total bytes read from storage.
1530    pub(crate) bytes_read: u64,
1531}
1532
1533impl std::fmt::Debug for MetadataCacheMetrics {
1534    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1535        let Self {
1536            mem_cache_hit,
1537            file_cache_hit,
1538            cache_miss,
1539            metadata_load_cost,
1540            num_reads,
1541            bytes_read,
1542        } = self;
1543
1544        if self.is_empty() {
1545            return write!(f, "{{}}");
1546        }
1547        write!(f, "{{")?;
1548
1549        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1550
1551        if *mem_cache_hit > 0 {
1552            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1553        }
1554        if *file_cache_hit > 0 {
1555            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1556        }
1557        if *cache_miss > 0 {
1558            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1559        }
1560        if *num_reads > 0 {
1561            write!(f, ", \"num_reads\":{}", num_reads)?;
1562        }
1563        if *bytes_read > 0 {
1564            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1565        }
1566
1567        write!(f, "}}")
1568    }
1569}
1570
1571impl MetadataCacheMetrics {
1572    /// Returns true if the metrics are empty (contain no meaningful data).
1573    pub(crate) fn is_empty(&self) -> bool {
1574        self.metadata_load_cost.is_zero()
1575    }
1576
1577    /// Adds `other` metrics to this metrics.
1578    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1579        self.mem_cache_hit += other.mem_cache_hit;
1580        self.file_cache_hit += other.file_cache_hit;
1581        self.cache_miss += other.cache_miss;
1582        self.metadata_load_cost += other.metadata_load_cost;
1583        self.num_reads += other.num_reads;
1584        self.bytes_read += other.bytes_read;
1585    }
1586}
1587
1588/// Parquet reader metrics.
1589#[derive(Debug, Default, Clone)]
1590pub struct ReaderMetrics {
1591    /// Filtered row groups and rows metrics.
1592    pub(crate) filter_metrics: ReaderFilterMetrics,
1593    /// Duration to build the parquet reader.
1594    pub(crate) build_cost: Duration,
1595    /// Duration to scan the reader.
1596    pub(crate) scan_cost: Duration,
1597    /// Number of record batches read.
1598    pub(crate) num_record_batches: usize,
1599    /// Number of batches decoded.
1600    pub(crate) num_batches: usize,
1601    /// Number of rows read.
1602    pub(crate) num_rows: usize,
1603    /// Metrics for parquet metadata cache.
1604    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1605    /// Optional metrics for page/row group fetch operations.
1606    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1607    /// Memory size of metadata loaded for building file ranges.
1608    pub(crate) metadata_mem_size: isize,
1609    /// Number of file range builders created.
1610    pub(crate) num_range_builders: isize,
1611}
1612
1613impl ReaderMetrics {
1614    /// Adds `other` metrics to this metrics.
1615    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1616        self.filter_metrics.merge_from(&other.filter_metrics);
1617        self.build_cost += other.build_cost;
1618        self.scan_cost += other.scan_cost;
1619        self.num_record_batches += other.num_record_batches;
1620        self.num_batches += other.num_batches;
1621        self.num_rows += other.num_rows;
1622        self.metadata_cache_metrics
1623            .merge_from(&other.metadata_cache_metrics);
1624        if let Some(other_fetch) = &other.fetch_metrics {
1625            if let Some(self_fetch) = &self.fetch_metrics {
1626                self_fetch.merge_from(other_fetch);
1627            } else {
1628                self.fetch_metrics = Some(other_fetch.clone());
1629            }
1630        }
1631        self.metadata_mem_size += other.metadata_mem_size;
1632        self.num_range_builders += other.num_range_builders;
1633    }
1634
1635    /// Reports total rows.
1636    pub(crate) fn observe_rows(&self, read_type: &str) {
1637        READ_ROWS_TOTAL
1638            .with_label_values(&[read_type])
1639            .inc_by(self.num_rows as u64);
1640    }
1641}
1642
1643/// Builder to build a [ParquetRecordBatchReader] for a row group.
1644pub(crate) struct RowGroupReaderBuilder {
1645    /// SST file to read.
1646    ///
1647    /// Holds the file handle to avoid the file purge it.
1648    file_handle: FileHandle,
1649    /// Path of the file.
1650    file_path: String,
1651    /// Metadata of the parquet file.
1652    parquet_meta: Arc<ParquetMetaData>,
1653    /// Object store as an Operator.
1654    object_store: ObjectStore,
1655    /// Projection mask.
1656    projection: ProjectionMask,
1657    /// Field levels to read.
1658    field_levels: FieldLevels,
1659    /// Cache.
1660    cache_strategy: CacheStrategy,
1661}
1662
1663impl RowGroupReaderBuilder {
1664    /// Path of the file to read.
1665    pub(crate) fn file_path(&self) -> &str {
1666        &self.file_path
1667    }
1668
1669    /// Handle of the file to read.
1670    pub(crate) fn file_handle(&self) -> &FileHandle {
1671        &self.file_handle
1672    }
1673
1674    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1675        &self.parquet_meta
1676    }
1677
1678    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1679        &self.cache_strategy
1680    }
1681
1682    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
1683    pub(crate) async fn build(
1684        &self,
1685        row_group_idx: usize,
1686        row_selection: Option<RowSelection>,
1687        fetch_metrics: Option<&ParquetFetchMetrics>,
1688    ) -> Result<ParquetRecordBatchReader> {
1689        let fetch_start = Instant::now();
1690
1691        let mut row_group = InMemoryRowGroup::create(
1692            self.file_handle.region_id(),
1693            self.file_handle.file_id().file_id(),
1694            &self.parquet_meta,
1695            row_group_idx,
1696            self.cache_strategy.clone(),
1697            &self.file_path,
1698            self.object_store.clone(),
1699        );
1700        // Fetches data into memory.
1701        row_group
1702            .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1703            .await
1704            .context(ReadParquetSnafu {
1705                path: &self.file_path,
1706            })?;
1707
1708        // Record total fetch elapsed time.
1709        if let Some(metrics) = fetch_metrics {
1710            metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1711        }
1712
1713        // Builds the parquet reader.
1714        // Now the row selection is None.
1715        ParquetRecordBatchReader::try_new_with_row_groups(
1716            &self.field_levels,
1717            &row_group,
1718            DEFAULT_READ_BATCH_SIZE,
1719            row_selection,
1720        )
1721        .context(ReadParquetSnafu {
1722            path: &self.file_path,
1723        })
1724    }
1725}
1726
1727/// The filter to evaluate or the prune result of the default value.
1728pub(crate) enum MaybeFilter {
1729    /// The filter to evaluate.
1730    Filter(SimpleFilterEvaluator),
1731    /// The filter matches the default value.
1732    Matched,
1733    /// The filter is pruned.
1734    Pruned,
1735}
1736
1737/// Context to evaluate the column filter for a parquet file.
1738pub(crate) struct SimpleFilterContext {
1739    /// Filter to evaluate.
1740    filter: MaybeFilter,
1741    /// Id of the column to evaluate.
1742    column_id: ColumnId,
1743    /// Semantic type of the column.
1744    semantic_type: SemanticType,
1745    /// The data type of the column.
1746    data_type: ConcreteDataType,
1747}
1748
1749impl SimpleFilterContext {
1750    /// Creates a context for the `expr`.
1751    ///
1752    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1753    /// expected metadata.
1754    pub(crate) fn new_opt(
1755        sst_meta: &RegionMetadataRef,
1756        expected_meta: Option<&RegionMetadata>,
1757        expr: &Expr,
1758    ) -> Option<Self> {
1759        let filter = SimpleFilterEvaluator::try_new(expr)?;
1760        let (column_metadata, maybe_filter) = match expected_meta {
1761            Some(meta) => {
1762                // Gets the column metadata from the expected metadata.
1763                let column = meta.column_by_name(filter.column_name())?;
1764                // Checks if the column is present in the SST metadata. We still uses the
1765                // column from the expected metadata.
1766                match sst_meta.column_by_id(column.column_id) {
1767                    Some(sst_column) => {
1768                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1769
1770                        (column, MaybeFilter::Filter(filter))
1771                    }
1772                    None => {
1773                        // If the column is not present in the SST metadata, we evaluate the filter
1774                        // against the default value of the column.
1775                        // If we can't evaluate the filter, we return None.
1776                        if pruned_by_default(&filter, column)? {
1777                            (column, MaybeFilter::Pruned)
1778                        } else {
1779                            (column, MaybeFilter::Matched)
1780                        }
1781                    }
1782                }
1783            }
1784            None => {
1785                let column = sst_meta.column_by_name(filter.column_name())?;
1786                (column, MaybeFilter::Filter(filter))
1787            }
1788        };
1789
1790        Some(Self {
1791            filter: maybe_filter,
1792            column_id: column_metadata.column_id,
1793            semantic_type: column_metadata.semantic_type,
1794            data_type: column_metadata.column_schema.data_type.clone(),
1795        })
1796    }
1797
1798    /// Returns the filter to evaluate.
1799    pub(crate) fn filter(&self) -> &MaybeFilter {
1800        &self.filter
1801    }
1802
1803    /// Returns the column id.
1804    pub(crate) fn column_id(&self) -> ColumnId {
1805        self.column_id
1806    }
1807
1808    /// Returns the semantic type of the column.
1809    pub(crate) fn semantic_type(&self) -> SemanticType {
1810        self.semantic_type
1811    }
1812
1813    /// Returns the data type of the column.
1814    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1815        &self.data_type
1816    }
1817}
1818
1819/// Prune a column by its default value.
1820/// Returns false if we can't create the default value or evaluate the filter.
1821fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1822    let value = column.column_schema.create_default().ok().flatten()?;
1823    let scalar_value = value
1824        .try_to_scalar_value(&column.column_schema.data_type)
1825        .ok()?;
1826    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1827    Some(!matches)
1828}
1829
1830/// Parquet batch reader to read our SST format.
1831pub struct ParquetReader {
1832    /// File range context.
1833    context: FileRangeContextRef,
1834    /// Row group selection to read.
1835    selection: RowGroupSelection,
1836    /// Reader of current row group.
1837    reader: Option<FlatPruneReader>,
1838    /// Metrics for tracking row group fetch operations.
1839    fetch_metrics: ParquetFetchMetrics,
1840}
1841
1842impl ParquetReader {
1843    #[tracing::instrument(
1844        skip_all,
1845        fields(
1846            region_id = %self.context.reader_builder().file_handle.region_id(),
1847            file_id = %self.context.reader_builder().file_handle.file_id()
1848        )
1849    )]
1850    pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1851        loop {
1852            if let Some(reader) = &mut self.reader {
1853                if let Some(batch) = reader.next_batch()? {
1854                    return Ok(Some(batch));
1855                }
1856                self.reader = None;
1857                continue;
1858            }
1859
1860            let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
1861                return Ok(None);
1862            };
1863
1864            let parquet_reader = self
1865                .context
1866                .reader_builder()
1867                .build(
1868                    row_group_idx,
1869                    Some(row_selection),
1870                    Some(&self.fetch_metrics),
1871                )
1872                .await?;
1873
1874            let skip_fields = self.context.should_skip_fields(row_group_idx);
1875            self.reader = Some(FlatPruneReader::new_with_row_group_reader(
1876                self.context.clone(),
1877                FlatRowGroupReader::new(self.context.clone(), parquet_reader),
1878                skip_fields,
1879            ));
1880        }
1881    }
1882    /// Creates a new reader.
1883    #[tracing::instrument(
1884        skip_all,
1885        fields(
1886            region_id = %context.reader_builder().file_handle.region_id(),
1887            file_id = %context.reader_builder().file_handle.file_id()
1888        )
1889    )]
1890    pub(crate) async fn new(
1891        context: FileRangeContextRef,
1892        mut selection: RowGroupSelection,
1893    ) -> Result<Self> {
1894        debug_assert!(context.read_format().as_flat().is_some());
1895        let fetch_metrics = ParquetFetchMetrics::default();
1896        let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1897            let parquet_reader = context
1898                .reader_builder()
1899                .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1900                .await?;
1901            let skip_fields = context.should_skip_fields(row_group_idx);
1902            Some(FlatPruneReader::new_with_row_group_reader(
1903                context.clone(),
1904                FlatRowGroupReader::new(context.clone(), parquet_reader),
1905                skip_fields,
1906            ))
1907        } else {
1908            None
1909        };
1910
1911        Ok(ParquetReader {
1912            context,
1913            selection,
1914            reader,
1915            fetch_metrics,
1916        })
1917    }
1918
1919    /// Returns the metadata of the SST.
1920    pub fn metadata(&self) -> &RegionMetadataRef {
1921        self.context.read_format().metadata()
1922    }
1923
1924    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1925        self.context.reader_builder().parquet_meta.clone()
1926    }
1927}
1928
1929/// RowGroupReaderContext represents the fields that cannot be shared
1930/// between different `RowGroupReader`s.
1931pub(crate) trait RowGroupReaderContext: Send {
1932    fn map_result(
1933        &self,
1934        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1935    ) -> Result<Option<RecordBatch>>;
1936
1937    fn read_format(&self) -> &ReadFormat;
1938}
1939
1940impl RowGroupReaderContext for FileRangeContextRef {
1941    fn map_result(
1942        &self,
1943        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1944    ) -> Result<Option<RecordBatch>> {
1945        result.context(ArrowReaderSnafu {
1946            path: self.file_path(),
1947        })
1948    }
1949
1950    fn read_format(&self) -> &ReadFormat {
1951        self.as_ref().read_format()
1952    }
1953}
1954
1955/// [RowGroupReader] that reads from [FileRange].
1956pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1957
1958impl RowGroupReader {
1959    /// Creates a new reader from file range.
1960    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1961        Self::create(context, reader)
1962    }
1963}
1964
1965/// Reader to read a row group of a parquet file.
1966pub(crate) struct RowGroupReaderBase<T> {
1967    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1968    context: T,
1969    /// Inner parquet reader.
1970    reader: ParquetRecordBatchReader,
1971    /// Buffered batches to return.
1972    batches: VecDeque<Batch>,
1973    /// Local scan metrics.
1974    metrics: ReaderMetrics,
1975    /// Cached sequence array to override sequences.
1976    override_sequence: Option<ArrayRef>,
1977}
1978
1979impl<T> RowGroupReaderBase<T>
1980where
1981    T: RowGroupReaderContext,
1982{
1983    /// Creates a new reader to read the primary key format.
1984    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1985        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1986        let override_sequence = context
1987            .read_format()
1988            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1989        assert!(context.read_format().as_primary_key().is_some());
1990
1991        Self {
1992            context,
1993            reader,
1994            batches: VecDeque::new(),
1995            metrics: ReaderMetrics::default(),
1996            override_sequence,
1997        }
1998    }
1999
2000    /// Gets the metrics.
2001    pub(crate) fn metrics(&self) -> &ReaderMetrics {
2002        &self.metrics
2003    }
2004
2005    /// Gets [ReadFormat] of underlying reader.
2006    pub(crate) fn read_format(&self) -> &ReadFormat {
2007        self.context.read_format()
2008    }
2009
2010    /// Tries to fetch next [RecordBatch] from the reader.
2011    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2012        self.context.map_result(self.reader.next().transpose())
2013    }
2014
2015    /// Returns the next [Batch].
2016    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
2017        let scan_start = Instant::now();
2018        if let Some(batch) = self.batches.pop_front() {
2019            self.metrics.num_rows += batch.num_rows();
2020            self.metrics.scan_cost += scan_start.elapsed();
2021            return Ok(Some(batch));
2022        }
2023
2024        // We need to fetch next record batch and convert it to batches.
2025        while self.batches.is_empty() {
2026            let Some(record_batch) = self.fetch_next_record_batch()? else {
2027                self.metrics.scan_cost += scan_start.elapsed();
2028                return Ok(None);
2029            };
2030            self.metrics.num_record_batches += 1;
2031
2032            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
2033            self.context
2034                .read_format()
2035                .as_primary_key()
2036                .unwrap()
2037                .convert_record_batch(
2038                    &record_batch,
2039                    self.override_sequence.as_ref(),
2040                    &mut self.batches,
2041                )?;
2042            self.metrics.num_batches += self.batches.len();
2043        }
2044        let batch = self.batches.pop_front();
2045        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2046        self.metrics.scan_cost += scan_start.elapsed();
2047        Ok(batch)
2048    }
2049}
2050
2051#[async_trait::async_trait]
2052impl<T> BatchReader for RowGroupReaderBase<T>
2053where
2054    T: RowGroupReaderContext,
2055{
2056    async fn next_batch(&mut self) -> Result<Option<Batch>> {
2057        self.next_inner()
2058    }
2059}
2060
2061/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2062pub(crate) struct FlatRowGroupReader {
2063    /// Context for file ranges.
2064    context: FileRangeContextRef,
2065    /// Inner parquet reader.
2066    reader: ParquetRecordBatchReader,
2067    /// Cached sequence array to override sequences.
2068    override_sequence: Option<ArrayRef>,
2069}
2070
2071impl FlatRowGroupReader {
2072    /// Creates a new flat reader from file range.
2073    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
2074        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2075        let override_sequence = context
2076            .read_format()
2077            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2078
2079        Self {
2080            context,
2081            reader,
2082            override_sequence,
2083        }
2084    }
2085
2086    /// Returns the next RecordBatch.
2087    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2088        match self.reader.next() {
2089            Some(batch_result) => {
2090                let record_batch = batch_result.context(ArrowReaderSnafu {
2091                    path: self.context.file_path(),
2092                })?;
2093
2094                // Safety: Only flat format use FlatRowGroupReader.
2095                let flat_format = self.context.read_format().as_flat().unwrap();
2096                let record_batch =
2097                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2098                Ok(Some(record_batch))
2099            }
2100            None => Ok(None),
2101        }
2102    }
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107    use std::any::Any;
2108    use std::fmt::{Debug, Formatter};
2109    use std::sync::{Arc, LazyLock};
2110
2111    use datafusion::arrow::datatypes::DataType;
2112    use datafusion_common::ScalarValue;
2113    use datafusion_expr::expr::ScalarFunction;
2114    use datafusion_expr::{
2115        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2116    };
2117    use datatypes::arrow::array::{ArrayRef, Int64Array};
2118    use datatypes::arrow::record_batch::RecordBatch;
2119    use object_store::services::Memory;
2120    use parquet::arrow::ArrowWriter;
2121    use store_api::region_request::PathType;
2122    use table::predicate::Predicate;
2123
2124    use super::*;
2125    use crate::sst::parquet::metadata::MetadataLoader;
2126    use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2127
2128    #[tokio::test(flavor = "current_thread")]
2129    async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2130        #[derive(Eq, PartialEq, Hash)]
2131        struct PanicDebugUdf;
2132
2133        impl Debug for PanicDebugUdf {
2134            fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2135                panic!("minmax predicate key should not format exprs when cache is disabled");
2136            }
2137        }
2138
2139        impl ScalarUDFImpl for PanicDebugUdf {
2140            fn as_any(&self) -> &dyn Any {
2141                self
2142            }
2143
2144            fn name(&self) -> &str {
2145                "panic_debug_udf"
2146            }
2147
2148            fn signature(&self) -> &Signature {
2149                static SIGNATURE: LazyLock<Signature> =
2150                    LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2151                &SIGNATURE
2152            }
2153
2154            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2155                Ok(DataType::Int64)
2156            }
2157
2158            fn invoke_with_args(
2159                &self,
2160                _args: ScalarFunctionArgs,
2161            ) -> datafusion_common::Result<ColumnarValue> {
2162                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2163            }
2164        }
2165
2166        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2167        let file_handle = sst_file_handle(0, 1);
2168        let table_dir = "test_table".to_string();
2169        let path_type = PathType::Bare;
2170        let file_path = file_handle.file_path(&table_dir, path_type);
2171
2172        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2173        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2174        let mut parquet_bytes = Vec::new();
2175        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2176        writer.write(&batch).unwrap();
2177        writer.close().unwrap();
2178        let file_size = parquet_bytes.len() as u64;
2179        object_store.write(&file_path, parquet_bytes).await.unwrap();
2180
2181        let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2182        let read_format =
2183            ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
2184
2185        let mut cache_metrics = MetadataCacheMetrics::default();
2186        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2187        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2188
2189        let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2190        let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2191            udf,
2192            vec![],
2193        ))]);
2194        let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2195            .predicate(Some(predicate))
2196            .cache(CacheStrategy::Disabled);
2197
2198        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2199        let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2200        let mut metrics = ReaderFilterMetrics::default();
2201        let selection = builder.row_groups_by_minmax(
2202            &read_format,
2203            &parquet_meta,
2204            row_group_size,
2205            total_row_count,
2206            &mut metrics,
2207            false,
2208        );
2209
2210        assert!(!selection.is_empty());
2211    }
2212}