mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use async_trait::async_trait;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_telemetry::{debug, tracing, warn};
27use datafusion_expr::Expr;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow::datatypes::Field;
30use datatypes::arrow::error::ArrowError;
31use datatypes::arrow::record_batch::RecordBatch;
32use datatypes::data_type::ConcreteDataType;
33use datatypes::prelude::DataType;
34use mito_codec::row_converter::build_primary_key_codec;
35use object_store::ObjectStore;
36use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
37use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
38use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
39use partition::expr::PartitionExpr;
40use snafu::{OptionExt, ResultExt};
41use store_api::codec::PrimaryKeyEncoding;
42use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
43use store_api::region_request::PathType;
44use store_api::storage::{ColumnId, FileId};
45use table::predicate::Predicate;
46
47use crate::cache::CacheStrategy;
48use crate::cache::index::result_cache::PredicateKey;
49#[cfg(feature = "vector_index")]
50use crate::error::ApplyVectorIndexSnafu;
51use crate::error::{
52    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
53    ReadParquetSnafu, Result, SerializePartitionExprSnafu,
54};
55use crate::metrics::{
56    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
57    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
58};
59use crate::read::flat_projection::CompactionProjectionMapper;
60use crate::read::prune::{PruneReader, Source};
61use crate::read::{Batch, BatchReader};
62use crate::sst::file::FileHandle;
63use crate::sst::index::bloom_filter::applier::{
64    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
65};
66use crate::sst::index::fulltext_index::applier::{
67    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
68};
69use crate::sst::index::inverted_index::applier::{
70    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
71};
72#[cfg(feature = "vector_index")]
73use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
74use crate::sst::parquet::file_range::{
75    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
76    row_group_contains_delete,
77};
78use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
79use crate::sst::parquet::metadata::MetadataLoader;
80use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
81use crate::sst::parquet::row_selection::RowGroupSelection;
82use crate::sst::parquet::stats::RowGroupPruningStats;
83use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
84use crate::sst::tag_maybe_to_dictionary_field;
85
86const INDEX_TYPE_FULLTEXT: &str = "fulltext";
87const INDEX_TYPE_INVERTED: &str = "inverted";
88const INDEX_TYPE_BLOOM: &str = "bloom filter";
89const INDEX_TYPE_VECTOR: &str = "vector";
90
91macro_rules! handle_index_error {
92    ($err:expr, $file_handle:expr, $index_type:expr) => {
93        if cfg!(any(test, feature = "test")) {
94            panic!(
95                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
96                $index_type,
97                $file_handle.region_id(),
98                $file_handle.file_id(),
99                $err
100            );
101        } else {
102            warn!(
103                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
104                $index_type,
105                $file_handle.region_id(),
106                $file_handle.file_id()
107            );
108        }
109    };
110}
111
112/// Parquet SST reader builder.
113pub struct ParquetReaderBuilder {
114    /// SST directory.
115    table_dir: String,
116    /// Path type for generating file paths.
117    path_type: PathType,
118    file_handle: FileHandle,
119    object_store: ObjectStore,
120    /// Predicate to push down.
121    predicate: Option<Predicate>,
122    /// Metadata of columns to read.
123    ///
124    /// `None` reads all columns. Due to schema change, the projection
125    /// can contain columns not in the parquet file.
126    projection: Option<Vec<ColumnId>>,
127    /// Strategy to cache SST data.
128    cache_strategy: CacheStrategy,
129    /// Index appliers.
130    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
131    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
132    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
133    /// Vector index applier for KNN search.
134    #[cfg(feature = "vector_index")]
135    vector_index_applier: Option<VectorIndexApplierRef>,
136    /// Over-fetched k for vector index scan.
137    #[cfg(feature = "vector_index")]
138    vector_index_k: Option<usize>,
139    /// Expected metadata of the region while reading the SST.
140    /// This is usually the latest metadata of the region. The reader use
141    /// it get the correct column id of a column by name.
142    expected_metadata: Option<RegionMetadataRef>,
143    /// Whether to use flat format for reading.
144    flat_format: bool,
145    /// Whether this reader is for compaction.
146    compaction: bool,
147    /// Mode to pre-filter columns.
148    pre_filter_mode: PreFilterMode,
149    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
150    decode_primary_key_values: bool,
151    page_index_policy: PageIndexPolicy,
152}
153
154impl ParquetReaderBuilder {
155    /// Returns a new [ParquetReaderBuilder] to read specific SST.
156    pub fn new(
157        table_dir: String,
158        path_type: PathType,
159        file_handle: FileHandle,
160        object_store: ObjectStore,
161    ) -> ParquetReaderBuilder {
162        ParquetReaderBuilder {
163            table_dir,
164            path_type,
165            file_handle,
166            object_store,
167            predicate: None,
168            projection: None,
169            cache_strategy: CacheStrategy::Disabled,
170            inverted_index_appliers: [None, None],
171            bloom_filter_index_appliers: [None, None],
172            fulltext_index_appliers: [None, None],
173            #[cfg(feature = "vector_index")]
174            vector_index_applier: None,
175            #[cfg(feature = "vector_index")]
176            vector_index_k: None,
177            expected_metadata: None,
178            flat_format: false,
179            compaction: false,
180            pre_filter_mode: PreFilterMode::All,
181            decode_primary_key_values: false,
182            page_index_policy: Default::default(),
183        }
184    }
185
186    /// Attaches the predicate to the builder.
187    #[must_use]
188    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
189        self.predicate = predicate;
190        self
191    }
192
193    /// Attaches the projection to the builder.
194    ///
195    /// The reader only applies the projection to fields.
196    #[must_use]
197    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
198        self.projection = projection;
199        self
200    }
201
202    /// Attaches the cache to the builder.
203    #[must_use]
204    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
205        self.cache_strategy = cache;
206        self
207    }
208
209    /// Attaches the inverted index appliers to the builder.
210    #[must_use]
211    pub(crate) fn inverted_index_appliers(
212        mut self,
213        index_appliers: [Option<InvertedIndexApplierRef>; 2],
214    ) -> Self {
215        self.inverted_index_appliers = index_appliers;
216        self
217    }
218
219    /// Attaches the bloom filter index appliers to the builder.
220    #[must_use]
221    pub(crate) fn bloom_filter_index_appliers(
222        mut self,
223        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
224    ) -> Self {
225        self.bloom_filter_index_appliers = index_appliers;
226        self
227    }
228
229    /// Attaches the fulltext index appliers to the builder.
230    #[must_use]
231    pub(crate) fn fulltext_index_appliers(
232        mut self,
233        index_appliers: [Option<FulltextIndexApplierRef>; 2],
234    ) -> Self {
235        self.fulltext_index_appliers = index_appliers;
236        self
237    }
238
239    /// Attaches the vector index applier to the builder.
240    #[cfg(feature = "vector_index")]
241    #[must_use]
242    pub(crate) fn vector_index_applier(
243        mut self,
244        applier: Option<VectorIndexApplierRef>,
245        k: Option<usize>,
246    ) -> Self {
247        self.vector_index_applier = applier;
248        self.vector_index_k = k;
249        self
250    }
251
252    /// Attaches the expected metadata to the builder.
253    #[must_use]
254    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
255        self.expected_metadata = expected_metadata;
256        self
257    }
258
259    /// Sets the flat format flag.
260    #[must_use]
261    pub fn flat_format(mut self, flat_format: bool) -> Self {
262        self.flat_format = flat_format;
263        self
264    }
265
266    /// Sets the compaction flag.
267    #[must_use]
268    pub fn compaction(mut self, compaction: bool) -> Self {
269        self.compaction = compaction;
270        self
271    }
272
273    /// Sets the pre-filter mode.
274    #[must_use]
275    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
276        self.pre_filter_mode = pre_filter_mode;
277        self
278    }
279
280    /// Decodes primary key values eagerly when reading primary key format SSTs.
281    #[must_use]
282    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
283        self.decode_primary_key_values = decode;
284        self
285    }
286
287    #[must_use]
288    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
289        self.page_index_policy = page_index_policy;
290        self
291    }
292
293    /// Builds a [ParquetReader].
294    ///
295    /// This needs to perform IO operation.
296    #[tracing::instrument(
297        skip_all,
298        fields(
299            region_id = %self.file_handle.region_id(),
300            file_id = %self.file_handle.file_id()
301        )
302    )]
303    pub async fn build(&self) -> Result<ParquetReader> {
304        let mut metrics = ReaderMetrics::default();
305
306        let (context, selection) = self.build_reader_input(&mut metrics).await?;
307        ParquetReader::new(Arc::new(context), selection).await
308    }
309
310    /// Builds a [FileRangeContext] and collects row groups to read.
311    ///
312    /// This needs to perform IO operation.
313    #[tracing::instrument(
314        skip_all,
315        fields(
316            region_id = %self.file_handle.region_id(),
317            file_id = %self.file_handle.file_id()
318        )
319    )]
320    pub(crate) async fn build_reader_input(
321        &self,
322        metrics: &mut ReaderMetrics,
323    ) -> Result<(FileRangeContext, RowGroupSelection)> {
324        let start = Instant::now();
325
326        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
327        let file_size = self.file_handle.meta_ref().file_size;
328
329        // Loads parquet metadata of the file.
330        let (parquet_meta, cache_miss) = self
331            .read_parquet_metadata(
332                &file_path,
333                file_size,
334                &mut metrics.metadata_cache_metrics,
335                self.page_index_policy,
336            )
337            .await?;
338        // Decodes region metadata.
339        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
340        // Gets the metadata stored in the SST.
341        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
342        let region_partition_expr = self
343            .expected_metadata
344            .as_ref()
345            .and_then(|meta| meta.partition_expr.as_ref());
346        let (_, is_same_region_partition) = Self::is_same_region_partition(
347            region_partition_expr.as_ref().map(|expr| expr.as_str()),
348            self.file_handle.meta_ref().partition_expr.as_ref(),
349        )?;
350        // Skip auto convert when:
351        // - compaction is enabled
352        // - region partition expr is same with file partition expr (no need to auto convert)
353        let skip_auto_convert = self.compaction && is_same_region_partition;
354
355        // Build a compaction projection helper when:
356        // - compaction is enabled
357        // - region partition expr differs from file partition expr
358        // - flat format is enabled
359        // - primary key encoding is sparse
360        //
361        // This is applied after row-group filtering to align batches with flat output schema
362        // before compat handling.
363        let compaction_projection_mapper = if self.compaction
364            && !is_same_region_partition
365            && self.flat_format
366            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
367        {
368            Some(CompactionProjectionMapper::try_new(&region_meta)?)
369        } else {
370            None
371        };
372
373        let mut read_format = if let Some(column_ids) = &self.projection {
374            ReadFormat::new(
375                region_meta.clone(),
376                Some(column_ids),
377                self.flat_format,
378                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
379                &file_path,
380                skip_auto_convert,
381            )?
382        } else {
383            // Lists all column ids to read, we always use the expected metadata if possible.
384            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
385            let column_ids: Vec<_> = expected_meta
386                .column_metadatas
387                .iter()
388                .map(|col| col.column_id)
389                .collect();
390            ReadFormat::new(
391                region_meta.clone(),
392                Some(&column_ids),
393                self.flat_format,
394                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395                &file_path,
396                skip_auto_convert,
397            )?
398        };
399        if self.decode_primary_key_values {
400            read_format.set_decode_primary_key_values(true);
401        }
402        if need_override_sequence(&parquet_meta) {
403            read_format
404                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
405        }
406
407        // Computes the projection mask.
408        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
409        let indices = read_format.projection_indices();
410        // Now we assumes we don't have nested schemas.
411        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
412        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
413
414        // Computes the field levels.
415        let hint = Some(read_format.arrow_schema().fields());
416        let field_levels =
417            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
418                .context(ReadDataPartSnafu)?;
419        let selection = self
420            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
421            .await;
422
423        // Trigger background download if metadata had a cache miss and selection is not empty
424        if cache_miss && !selection.is_empty() {
425            use crate::cache::file_cache::{FileType, IndexKey};
426            let index_key = IndexKey::new(
427                self.file_handle.region_id(),
428                self.file_handle.file_id().file_id(),
429                FileType::Parquet,
430            );
431            self.cache_strategy.maybe_download_background(
432                index_key,
433                file_path.clone(),
434                self.object_store.clone(),
435                file_size,
436            );
437        }
438
439        let prune_schema = self
440            .expected_metadata
441            .as_ref()
442            .map(|meta| meta.schema.clone())
443            .unwrap_or_else(|| region_meta.schema.clone());
444
445        let reader_builder = RowGroupReaderBuilder {
446            file_handle: self.file_handle.clone(),
447            file_path,
448            parquet_meta,
449            object_store: self.object_store.clone(),
450            projection: projection_mask,
451            field_levels,
452            cache_strategy: self.cache_strategy.clone(),
453        };
454
455        let filters = if let Some(predicate) = &self.predicate {
456            predicate
457                .exprs()
458                .iter()
459                .filter_map(|expr| {
460                    SimpleFilterContext::new_opt(
461                        &region_meta,
462                        self.expected_metadata.as_deref(),
463                        expr,
464                    )
465                })
466                .collect::<Vec<_>>()
467        } else {
468            vec![]
469        };
470
471        let dyn_filters = if let Some(predicate) = &self.predicate {
472            predicate.dyn_filters().clone()
473        } else {
474            Arc::new(vec![])
475        };
476
477        let codec = build_primary_key_codec(read_format.metadata());
478
479        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
480
481        let context = FileRangeContext::new(
482            reader_builder,
483            RangeBase {
484                filters,
485                dyn_filters,
486                read_format,
487                expected_metadata: self.expected_metadata.clone(),
488                prune_schema,
489                codec,
490                compat_batch: None,
491                compaction_projection_mapper,
492                pre_filter_mode: self.pre_filter_mode,
493                partition_filter,
494            },
495        );
496
497        metrics.build_cost += start.elapsed();
498
499        Ok((context, selection))
500    }
501
502    fn is_same_region_partition(
503        region_partition_expr_str: Option<&str>,
504        file_partition_expr: Option<&PartitionExpr>,
505    ) -> Result<(Option<PartitionExpr>, bool)> {
506        let region_partition_expr = match region_partition_expr_str {
507            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
508            None => None,
509        };
510
511        let is_same = region_partition_expr.as_ref() == file_partition_expr;
512        Ok((region_partition_expr, is_same))
513    }
514
515    /// Compare partition expressions from expected metadata and file metadata,
516    /// and build a partition filter if they differ.
517    fn build_partition_filter(
518        &self,
519        read_format: &ReadFormat,
520        prune_schema: &Arc<datatypes::schema::Schema>,
521    ) -> Result<Option<PartitionFilterContext>> {
522        let region_partition_expr_str = self
523            .expected_metadata
524            .as_ref()
525            .and_then(|meta| meta.partition_expr.as_ref());
526        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
527
528        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
529            region_partition_expr_str.map(|s| s.as_str()),
530            file_partition_expr_ref,
531        )?;
532
533        if is_same_region_partition {
534            return Ok(None);
535        }
536
537        let Some(region_partition_expr) = region_partition_expr else {
538            return Ok(None);
539        };
540
541        // Collect columns referenced by the partition expression.
542        let mut referenced_columns = HashSet::new();
543        region_partition_expr.collect_column_names(&mut referenced_columns);
544
545        // Build a partition_schema containing only referenced columns.
546        let is_flat = read_format.as_flat().is_some();
547        let partition_schema = Arc::new(datatypes::schema::Schema::new(
548            prune_schema
549                .column_schemas()
550                .iter()
551                .filter(|col| referenced_columns.contains(&col.name))
552                .map(|col| {
553                    if is_flat
554                        && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
555                        && column_meta.semantic_type == SemanticType::Tag
556                        && col.data_type.is_string()
557                    {
558                        let field = Arc::new(Field::new(
559                            &col.name,
560                            col.data_type.as_arrow_type(),
561                            col.is_nullable(),
562                        ));
563                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
564                        let mut column = col.clone();
565                        column.data_type =
566                            ConcreteDataType::from_arrow_type(dict_field.data_type());
567                        return column;
568                    }
569
570                    col.clone()
571                })
572                .collect::<Vec<_>>(),
573        ));
574
575        let region_partition_physical_expr = region_partition_expr
576            .try_as_physical_expr(partition_schema.arrow_schema())
577            .context(SerializePartitionExprSnafu)?;
578
579        Ok(Some(PartitionFilterContext {
580            region_partition_physical_expr,
581            partition_schema,
582        }))
583    }
584
585    /// Decodes region metadata from key value.
586    fn get_region_metadata(
587        file_path: &str,
588        key_value_meta: Option<&Vec<KeyValue>>,
589    ) -> Result<RegionMetadata> {
590        let key_values = key_value_meta.context(InvalidParquetSnafu {
591            file: file_path,
592            reason: "missing key value meta",
593        })?;
594        let meta_value = key_values
595            .iter()
596            .find(|kv| kv.key == PARQUET_METADATA_KEY)
597            .with_context(|| InvalidParquetSnafu {
598                file: file_path,
599                reason: format!("key {} not found", PARQUET_METADATA_KEY),
600            })?;
601        let json = meta_value
602            .value
603            .as_ref()
604            .with_context(|| InvalidParquetSnafu {
605                file: file_path,
606                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
607            })?;
608
609        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
610    }
611
612    /// Reads parquet metadata of specific file.
613    /// Returns (metadata, cache_miss_flag).
614    async fn read_parquet_metadata(
615        &self,
616        file_path: &str,
617        file_size: u64,
618        cache_metrics: &mut MetadataCacheMetrics,
619        page_index_policy: PageIndexPolicy,
620    ) -> Result<(Arc<ParquetMetaData>, bool)> {
621        let start = Instant::now();
622        let _t = READ_STAGE_ELAPSED
623            .with_label_values(&["read_parquet_metadata"])
624            .start_timer();
625
626        let file_id = self.file_handle.file_id();
627        // Tries to get from cache with metrics tracking.
628        if let Some(metadata) = self
629            .cache_strategy
630            .get_parquet_meta_data(file_id, cache_metrics, page_index_policy)
631            .await
632        {
633            cache_metrics.metadata_load_cost += start.elapsed();
634            return Ok((metadata, false));
635        }
636
637        // Cache miss, load metadata directly.
638        let mut metadata_loader =
639            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
640        metadata_loader.with_page_index_policy(page_index_policy);
641        let metadata = metadata_loader.load(cache_metrics).await?;
642
643        let metadata = Arc::new(metadata);
644        // Cache the metadata.
645        self.cache_strategy
646            .put_parquet_meta_data(file_id, metadata.clone());
647
648        cache_metrics.metadata_load_cost += start.elapsed();
649        Ok((metadata, true))
650    }
651
652    /// Computes row groups to read, along with their respective row selections.
653    #[tracing::instrument(
654        skip_all,
655        fields(
656            region_id = %self.file_handle.region_id(),
657            file_id = %self.file_handle.file_id()
658        )
659    )]
660    async fn row_groups_to_read(
661        &self,
662        read_format: &ReadFormat,
663        parquet_meta: &ParquetMetaData,
664        metrics: &mut ReaderFilterMetrics,
665    ) -> RowGroupSelection {
666        let num_row_groups = parquet_meta.num_row_groups();
667        let num_rows = parquet_meta.file_metadata().num_rows();
668        if num_row_groups == 0 || num_rows == 0 {
669            return RowGroupSelection::default();
670        }
671
672        // Let's assume that the number of rows in the first row group
673        // can represent the `row_group_size` of the Parquet file.
674        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
675        if row_group_size == 0 {
676            return RowGroupSelection::default();
677        }
678
679        metrics.rg_total += num_row_groups;
680        metrics.rows_total += num_rows as usize;
681
682        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
683
684        // Compute skip_fields once for all pruning operations
685        let skip_fields = self.compute_skip_fields(parquet_meta);
686
687        self.prune_row_groups_by_minmax(
688            read_format,
689            parquet_meta,
690            &mut output,
691            metrics,
692            skip_fields,
693        );
694        if output.is_empty() {
695            return output;
696        }
697
698        let fulltext_filtered = self
699            .prune_row_groups_by_fulltext_index(
700                row_group_size,
701                num_row_groups,
702                &mut output,
703                metrics,
704                skip_fields,
705            )
706            .await;
707        if output.is_empty() {
708            return output;
709        }
710
711        self.prune_row_groups_by_inverted_index(
712            row_group_size,
713            num_row_groups,
714            &mut output,
715            metrics,
716            skip_fields,
717        )
718        .await;
719        if output.is_empty() {
720            return output;
721        }
722
723        self.prune_row_groups_by_bloom_filter(
724            row_group_size,
725            parquet_meta,
726            &mut output,
727            metrics,
728            skip_fields,
729        )
730        .await;
731        if output.is_empty() {
732            return output;
733        }
734
735        if !fulltext_filtered {
736            self.prune_row_groups_by_fulltext_bloom(
737                row_group_size,
738                parquet_meta,
739                &mut output,
740                metrics,
741                skip_fields,
742            )
743            .await;
744        }
745        #[cfg(feature = "vector_index")]
746        {
747            self.prune_row_groups_by_vector_index(
748                row_group_size,
749                num_row_groups,
750                &mut output,
751                metrics,
752            )
753            .await;
754            if output.is_empty() {
755                return output;
756            }
757        }
758        output
759    }
760
761    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
762    async fn prune_row_groups_by_fulltext_index(
763        &self,
764        row_group_size: usize,
765        num_row_groups: usize,
766        output: &mut RowGroupSelection,
767        metrics: &mut ReaderFilterMetrics,
768        skip_fields: bool,
769    ) -> bool {
770        if !self.file_handle.meta_ref().fulltext_index_available() {
771            return false;
772        }
773
774        let mut pruned = false;
775        // If skip_fields is true, only apply the first applier (for tags).
776        let appliers = if skip_fields {
777            &self.fulltext_index_appliers[..1]
778        } else {
779            &self.fulltext_index_appliers[..]
780        };
781        for index_applier in appliers.iter().flatten() {
782            let predicate_key = index_applier.predicate_key();
783            // Fast path: return early if the result is in the cache.
784            let cached = self
785                .cache_strategy
786                .index_result_cache()
787                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
788            if let Some(result) = cached.as_ref()
789                && all_required_row_groups_searched(output, result)
790            {
791                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
792                metrics.fulltext_index_cache_hit += 1;
793                pruned = true;
794                continue;
795            }
796
797            // Slow path: apply the index from the file.
798            metrics.fulltext_index_cache_miss += 1;
799            let file_size_hint = self.file_handle.meta_ref().index_file_size();
800            let apply_res = index_applier
801                .apply_fine(
802                    self.file_handle.index_id(),
803                    Some(file_size_hint),
804                    metrics.fulltext_index_apply_metrics.as_mut(),
805                )
806                .await;
807            let selection = match apply_res {
808                Ok(Some(res)) => {
809                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
810                }
811                Ok(None) => continue,
812                Err(err) => {
813                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
814                    continue;
815                }
816            };
817
818            self.apply_index_result_and_update_cache(
819                predicate_key,
820                self.file_handle.file_id().file_id(),
821                selection,
822                output,
823                metrics,
824                INDEX_TYPE_FULLTEXT,
825            );
826            pruned = true;
827        }
828        pruned
829    }
830
831    /// Applies index to prune row groups.
832    ///
833    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
834    /// as an escape route in case of index issues, and it can be used to test
835    /// the correctness of the index.
836    async fn prune_row_groups_by_inverted_index(
837        &self,
838        row_group_size: usize,
839        num_row_groups: usize,
840        output: &mut RowGroupSelection,
841        metrics: &mut ReaderFilterMetrics,
842        skip_fields: bool,
843    ) -> bool {
844        if !self.file_handle.meta_ref().inverted_index_available() {
845            return false;
846        }
847
848        let mut pruned = false;
849        // If skip_fields is true, only apply the first applier (for tags).
850        let appliers = if skip_fields {
851            &self.inverted_index_appliers[..1]
852        } else {
853            &self.inverted_index_appliers[..]
854        };
855        for index_applier in appliers.iter().flatten() {
856            let predicate_key = index_applier.predicate_key();
857            // Fast path: return early if the result is in the cache.
858            let cached = self
859                .cache_strategy
860                .index_result_cache()
861                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
862            if let Some(result) = cached.as_ref()
863                && all_required_row_groups_searched(output, result)
864            {
865                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
866                metrics.inverted_index_cache_hit += 1;
867                pruned = true;
868                continue;
869            }
870
871            // Slow path: apply the index from the file.
872            metrics.inverted_index_cache_miss += 1;
873            let file_size_hint = self.file_handle.meta_ref().index_file_size();
874            let apply_res = index_applier
875                .apply(
876                    self.file_handle.index_id(),
877                    Some(file_size_hint),
878                    metrics.inverted_index_apply_metrics.as_mut(),
879                )
880                .await;
881            let selection = match apply_res {
882                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
883                    row_group_size,
884                    num_row_groups,
885                    apply_output,
886                ),
887                Err(err) => {
888                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
889                    continue;
890                }
891            };
892
893            self.apply_index_result_and_update_cache(
894                predicate_key,
895                self.file_handle.file_id().file_id(),
896                selection,
897                output,
898                metrics,
899                INDEX_TYPE_INVERTED,
900            );
901            pruned = true;
902        }
903        pruned
904    }
905
906    async fn prune_row_groups_by_bloom_filter(
907        &self,
908        row_group_size: usize,
909        parquet_meta: &ParquetMetaData,
910        output: &mut RowGroupSelection,
911        metrics: &mut ReaderFilterMetrics,
912        skip_fields: bool,
913    ) -> bool {
914        if !self.file_handle.meta_ref().bloom_filter_index_available() {
915            return false;
916        }
917
918        let mut pruned = false;
919        // If skip_fields is true, only apply the first applier (for tags).
920        let appliers = if skip_fields {
921            &self.bloom_filter_index_appliers[..1]
922        } else {
923            &self.bloom_filter_index_appliers[..]
924        };
925        for index_applier in appliers.iter().flatten() {
926            let predicate_key = index_applier.predicate_key();
927            // Fast path: return early if the result is in the cache.
928            let cached = self
929                .cache_strategy
930                .index_result_cache()
931                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
932            if let Some(result) = cached.as_ref()
933                && all_required_row_groups_searched(output, result)
934            {
935                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
936                metrics.bloom_filter_cache_hit += 1;
937                pruned = true;
938                continue;
939            }
940
941            // Slow path: apply the index from the file.
942            metrics.bloom_filter_cache_miss += 1;
943            let file_size_hint = self.file_handle.meta_ref().index_file_size();
944            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
945                (
946                    rg.num_rows() as usize,
947                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
948                    output.contains_non_empty_row_group(i)
949                        && cached
950                            .as_ref()
951                            .map(|c| !c.contains_row_group(i))
952                            .unwrap_or(true),
953                )
954            });
955            let apply_res = index_applier
956                .apply(
957                    self.file_handle.index_id(),
958                    Some(file_size_hint),
959                    rgs,
960                    metrics.bloom_filter_apply_metrics.as_mut(),
961                )
962                .await;
963            let mut selection = match apply_res {
964                Ok(apply_output) => {
965                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
966                }
967                Err(err) => {
968                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
969                    continue;
970                }
971            };
972
973            // New searched row groups are added to `selection`, concat them with `cached`.
974            if let Some(cached) = cached.as_ref() {
975                selection.concat(cached);
976            }
977
978            self.apply_index_result_and_update_cache(
979                predicate_key,
980                self.file_handle.file_id().file_id(),
981                selection,
982                output,
983                metrics,
984                INDEX_TYPE_BLOOM,
985            );
986            pruned = true;
987        }
988        pruned
989    }
990
991    /// Prunes row groups by vector index results.
992    #[cfg(feature = "vector_index")]
993    async fn prune_row_groups_by_vector_index(
994        &self,
995        row_group_size: usize,
996        num_row_groups: usize,
997        output: &mut RowGroupSelection,
998        metrics: &mut ReaderFilterMetrics,
999    ) {
1000        let Some(applier) = &self.vector_index_applier else {
1001            return;
1002        };
1003        let Some(k) = self.vector_index_k else {
1004            return;
1005        };
1006        if !self.file_handle.meta_ref().vector_index_available() {
1007            return;
1008        }
1009
1010        let file_size_hint = self.file_handle.meta_ref().index_file_size();
1011        let apply_res = applier
1012            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1013            .await;
1014        let row_ids = match apply_res {
1015            Ok(res) => res.row_offsets,
1016            Err(err) => {
1017                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1018                return;
1019            }
1020        };
1021
1022        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1023        {
1024            Ok(selection) => selection,
1025            Err(err) => {
1026                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1027                return;
1028            }
1029        };
1030        metrics.rows_vector_selected += selection.row_count();
1031        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1032    }
1033
1034    async fn prune_row_groups_by_fulltext_bloom(
1035        &self,
1036        row_group_size: usize,
1037        parquet_meta: &ParquetMetaData,
1038        output: &mut RowGroupSelection,
1039        metrics: &mut ReaderFilterMetrics,
1040        skip_fields: bool,
1041    ) -> bool {
1042        if !self.file_handle.meta_ref().fulltext_index_available() {
1043            return false;
1044        }
1045
1046        let mut pruned = false;
1047        // If skip_fields is true, only apply the first applier (for tags).
1048        let appliers = if skip_fields {
1049            &self.fulltext_index_appliers[..1]
1050        } else {
1051            &self.fulltext_index_appliers[..]
1052        };
1053        for index_applier in appliers.iter().flatten() {
1054            let predicate_key = index_applier.predicate_key();
1055            // Fast path: return early if the result is in the cache.
1056            let cached = self
1057                .cache_strategy
1058                .index_result_cache()
1059                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1060            if let Some(result) = cached.as_ref()
1061                && all_required_row_groups_searched(output, result)
1062            {
1063                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1064                metrics.fulltext_index_cache_hit += 1;
1065                pruned = true;
1066                continue;
1067            }
1068
1069            // Slow path: apply the index from the file.
1070            metrics.fulltext_index_cache_miss += 1;
1071            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1072            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1073                (
1074                    rg.num_rows() as usize,
1075                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1076                    output.contains_non_empty_row_group(i)
1077                        && cached
1078                            .as_ref()
1079                            .map(|c| !c.contains_row_group(i))
1080                            .unwrap_or(true),
1081                )
1082            });
1083            let apply_res = index_applier
1084                .apply_coarse(
1085                    self.file_handle.index_id(),
1086                    Some(file_size_hint),
1087                    rgs,
1088                    metrics.fulltext_index_apply_metrics.as_mut(),
1089                )
1090                .await;
1091            let mut selection = match apply_res {
1092                Ok(Some(apply_output)) => {
1093                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1094                }
1095                Ok(None) => continue,
1096                Err(err) => {
1097                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1098                    continue;
1099                }
1100            };
1101
1102            // New searched row groups are added to `selection`, concat them with `cached`.
1103            if let Some(cached) = cached.as_ref() {
1104                selection.concat(cached);
1105            }
1106
1107            self.apply_index_result_and_update_cache(
1108                predicate_key,
1109                self.file_handle.file_id().file_id(),
1110                selection,
1111                output,
1112                metrics,
1113                INDEX_TYPE_FULLTEXT,
1114            );
1115            pruned = true;
1116        }
1117        pruned
1118    }
1119
1120    /// Computes whether to skip field columns when building statistics based on PreFilterMode.
1121    fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
1122        match self.pre_filter_mode {
1123            PreFilterMode::All => false,
1124            PreFilterMode::SkipFields => true,
1125            PreFilterMode::SkipFieldsOnDelete => {
1126                // Check if any row group contains delete op
1127                let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
1128                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1129                    row_group_contains_delete(parquet_meta, rg_idx, &file_path)
1130                        .inspect_err(|e| {
1131                            warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
1132                        })
1133                        .unwrap_or(false)
1134                })
1135            }
1136        }
1137    }
1138
1139    /// Prunes row groups by min-max index.
1140    fn prune_row_groups_by_minmax(
1141        &self,
1142        read_format: &ReadFormat,
1143        parquet_meta: &ParquetMetaData,
1144        output: &mut RowGroupSelection,
1145        metrics: &mut ReaderFilterMetrics,
1146        skip_fields: bool,
1147    ) -> bool {
1148        let Some(predicate) = &self.predicate else {
1149            return false;
1150        };
1151
1152        let row_groups_before = output.row_group_count();
1153
1154        let region_meta = read_format.metadata();
1155        let row_groups = parquet_meta.row_groups();
1156        let stats = RowGroupPruningStats::new(
1157            row_groups,
1158            read_format,
1159            self.expected_metadata.clone(),
1160            skip_fields,
1161        );
1162        let prune_schema = self
1163            .expected_metadata
1164            .as_ref()
1165            .map(|meta| meta.schema.arrow_schema())
1166            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1167
1168        // Here we use the schema of the SST to build the physical expression. If the column
1169        // in the SST doesn't have the same column id as the column in the expected metadata,
1170        // we will get a None statistics for that column.
1171        predicate
1172            .prune_with_stats(&stats, prune_schema)
1173            .iter()
1174            .zip(0..parquet_meta.num_row_groups())
1175            .for_each(|(mask, row_group)| {
1176                if !*mask {
1177                    output.remove_row_group(row_group);
1178                }
1179            });
1180
1181        let row_groups_after = output.row_group_count();
1182        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
1183
1184        true
1185    }
1186
1187    fn apply_index_result_and_update_cache(
1188        &self,
1189        predicate_key: &PredicateKey,
1190        file_id: FileId,
1191        result: RowGroupSelection,
1192        output: &mut RowGroupSelection,
1193        metrics: &mut ReaderFilterMetrics,
1194        index_type: &str,
1195    ) {
1196        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1197
1198        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1199            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1200        }
1201    }
1202}
1203
1204fn apply_selection_and_update_metrics(
1205    output: &mut RowGroupSelection,
1206    result: &RowGroupSelection,
1207    metrics: &mut ReaderFilterMetrics,
1208    index_type: &str,
1209) {
1210    let intersection = output.intersect(result);
1211
1212    let row_group_count = output.row_group_count() - intersection.row_group_count();
1213    let row_count = output.row_count() - intersection.row_count();
1214
1215    metrics.update_index_metrics(index_type, row_group_count, row_count);
1216
1217    *output = intersection;
1218}
1219
1220#[cfg(feature = "vector_index")]
1221fn vector_selection_from_offsets(
1222    row_offsets: Vec<u64>,
1223    row_group_size: usize,
1224    num_row_groups: usize,
1225) -> Result<RowGroupSelection> {
1226    let mut row_ids = BTreeSet::new();
1227    for offset in row_offsets {
1228        let row_id = u32::try_from(offset).map_err(|_| {
1229            ApplyVectorIndexSnafu {
1230                reason: format!("Row offset {} exceeds u32::MAX", offset),
1231            }
1232            .build()
1233        })?;
1234        row_ids.insert(row_id);
1235    }
1236    Ok(RowGroupSelection::from_row_ids(
1237        row_ids,
1238        row_group_size,
1239        num_row_groups,
1240    ))
1241}
1242
1243fn all_required_row_groups_searched(
1244    required_row_groups: &RowGroupSelection,
1245    cached_row_groups: &RowGroupSelection,
1246) -> bool {
1247    required_row_groups.iter().all(|(rg_id, _)| {
1248        // Row group with no rows is not required to search.
1249        !required_row_groups.contains_non_empty_row_group(*rg_id)
1250            // The row group is already searched.
1251            || cached_row_groups.contains_row_group(*rg_id)
1252    })
1253}
1254
1255/// Metrics of filtering rows groups and rows.
1256#[derive(Debug, Default, Clone)]
1257pub(crate) struct ReaderFilterMetrics {
1258    /// Number of row groups before filtering.
1259    pub(crate) rg_total: usize,
1260    /// Number of row groups filtered by fulltext index.
1261    pub(crate) rg_fulltext_filtered: usize,
1262    /// Number of row groups filtered by inverted index.
1263    pub(crate) rg_inverted_filtered: usize,
1264    /// Number of row groups filtered by min-max index.
1265    pub(crate) rg_minmax_filtered: usize,
1266    /// Number of row groups filtered by bloom filter index.
1267    pub(crate) rg_bloom_filtered: usize,
1268    /// Number of row groups filtered by vector index.
1269    pub(crate) rg_vector_filtered: usize,
1270
1271    /// Number of rows in row group before filtering.
1272    pub(crate) rows_total: usize,
1273    /// Number of rows in row group filtered by fulltext index.
1274    pub(crate) rows_fulltext_filtered: usize,
1275    /// Number of rows in row group filtered by inverted index.
1276    pub(crate) rows_inverted_filtered: usize,
1277    /// Number of rows in row group filtered by bloom filter index.
1278    pub(crate) rows_bloom_filtered: usize,
1279    /// Number of rows filtered by vector index.
1280    pub(crate) rows_vector_filtered: usize,
1281    /// Number of rows selected by vector index.
1282    pub(crate) rows_vector_selected: usize,
1283    /// Number of rows filtered by precise filter.
1284    pub(crate) rows_precise_filtered: usize,
1285
1286    /// Number of index result cache hits for fulltext index.
1287    pub(crate) fulltext_index_cache_hit: usize,
1288    /// Number of index result cache misses for fulltext index.
1289    pub(crate) fulltext_index_cache_miss: usize,
1290    /// Number of index result cache hits for inverted index.
1291    pub(crate) inverted_index_cache_hit: usize,
1292    /// Number of index result cache misses for inverted index.
1293    pub(crate) inverted_index_cache_miss: usize,
1294    /// Number of index result cache hits for bloom filter index.
1295    pub(crate) bloom_filter_cache_hit: usize,
1296    /// Number of index result cache misses for bloom filter index.
1297    pub(crate) bloom_filter_cache_miss: usize,
1298
1299    /// Optional metrics for inverted index applier.
1300    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1301    /// Optional metrics for bloom filter index applier.
1302    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1303    /// Optional metrics for fulltext index applier.
1304    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1305}
1306
1307impl ReaderFilterMetrics {
1308    /// Adds `other` metrics to this metrics.
1309    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1310        self.rg_total += other.rg_total;
1311        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1312        self.rg_inverted_filtered += other.rg_inverted_filtered;
1313        self.rg_minmax_filtered += other.rg_minmax_filtered;
1314        self.rg_bloom_filtered += other.rg_bloom_filtered;
1315        self.rg_vector_filtered += other.rg_vector_filtered;
1316
1317        self.rows_total += other.rows_total;
1318        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1319        self.rows_inverted_filtered += other.rows_inverted_filtered;
1320        self.rows_bloom_filtered += other.rows_bloom_filtered;
1321        self.rows_vector_filtered += other.rows_vector_filtered;
1322        self.rows_vector_selected += other.rows_vector_selected;
1323        self.rows_precise_filtered += other.rows_precise_filtered;
1324
1325        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1326        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1327        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1328        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1329        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1330        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1331
1332        // Merge optional applier metrics
1333        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1334            self.inverted_index_apply_metrics
1335                .get_or_insert_with(Default::default)
1336                .merge_from(other_metrics);
1337        }
1338        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1339            self.bloom_filter_apply_metrics
1340                .get_or_insert_with(Default::default)
1341                .merge_from(other_metrics);
1342        }
1343        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1344            self.fulltext_index_apply_metrics
1345                .get_or_insert_with(Default::default)
1346                .merge_from(other_metrics);
1347        }
1348    }
1349
1350    /// Reports metrics.
1351    pub(crate) fn observe(&self) {
1352        READ_ROW_GROUPS_TOTAL
1353            .with_label_values(&["before_filtering"])
1354            .inc_by(self.rg_total as u64);
1355        READ_ROW_GROUPS_TOTAL
1356            .with_label_values(&["fulltext_index_filtered"])
1357            .inc_by(self.rg_fulltext_filtered as u64);
1358        READ_ROW_GROUPS_TOTAL
1359            .with_label_values(&["inverted_index_filtered"])
1360            .inc_by(self.rg_inverted_filtered as u64);
1361        READ_ROW_GROUPS_TOTAL
1362            .with_label_values(&["minmax_index_filtered"])
1363            .inc_by(self.rg_minmax_filtered as u64);
1364        READ_ROW_GROUPS_TOTAL
1365            .with_label_values(&["bloom_filter_index_filtered"])
1366            .inc_by(self.rg_bloom_filtered as u64);
1367        READ_ROW_GROUPS_TOTAL
1368            .with_label_values(&["vector_index_filtered"])
1369            .inc_by(self.rg_vector_filtered as u64);
1370
1371        PRECISE_FILTER_ROWS_TOTAL
1372            .with_label_values(&["parquet"])
1373            .inc_by(self.rows_precise_filtered as u64);
1374        READ_ROWS_IN_ROW_GROUP_TOTAL
1375            .with_label_values(&["before_filtering"])
1376            .inc_by(self.rows_total as u64);
1377        READ_ROWS_IN_ROW_GROUP_TOTAL
1378            .with_label_values(&["fulltext_index_filtered"])
1379            .inc_by(self.rows_fulltext_filtered as u64);
1380        READ_ROWS_IN_ROW_GROUP_TOTAL
1381            .with_label_values(&["inverted_index_filtered"])
1382            .inc_by(self.rows_inverted_filtered as u64);
1383        READ_ROWS_IN_ROW_GROUP_TOTAL
1384            .with_label_values(&["bloom_filter_index_filtered"])
1385            .inc_by(self.rows_bloom_filtered as u64);
1386        READ_ROWS_IN_ROW_GROUP_TOTAL
1387            .with_label_values(&["vector_index_filtered"])
1388            .inc_by(self.rows_vector_filtered as u64);
1389    }
1390
1391    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1392        match index_type {
1393            INDEX_TYPE_FULLTEXT => {
1394                self.rg_fulltext_filtered += row_group_count;
1395                self.rows_fulltext_filtered += row_count;
1396            }
1397            INDEX_TYPE_INVERTED => {
1398                self.rg_inverted_filtered += row_group_count;
1399                self.rows_inverted_filtered += row_count;
1400            }
1401            INDEX_TYPE_BLOOM => {
1402                self.rg_bloom_filtered += row_group_count;
1403                self.rows_bloom_filtered += row_count;
1404            }
1405            INDEX_TYPE_VECTOR => {
1406                self.rg_vector_filtered += row_group_count;
1407                self.rows_vector_filtered += row_count;
1408            }
1409            _ => {}
1410        }
1411    }
1412}
1413
1414#[cfg(all(test, feature = "vector_index"))]
1415mod tests {
1416    use super::*;
1417
1418    #[test]
1419    fn test_vector_selection_from_offsets() {
1420        let row_group_size = 4;
1421        let num_row_groups = 3;
1422        let selection =
1423            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1424                .unwrap();
1425
1426        assert_eq!(selection.row_group_count(), 3);
1427        assert_eq!(selection.row_count(), 4);
1428        assert!(selection.contains_non_empty_row_group(0));
1429        assert!(selection.contains_non_empty_row_group(1));
1430        assert!(selection.contains_non_empty_row_group(2));
1431    }
1432
1433    #[test]
1434    fn test_vector_selection_from_offsets_out_of_range() {
1435        let row_group_size = 4;
1436        let num_row_groups = 2;
1437        let selection = vector_selection_from_offsets(
1438            vec![0, 7, u64::from(u32::MAX) + 1],
1439            row_group_size,
1440            num_row_groups,
1441        );
1442        assert!(selection.is_err());
1443    }
1444
1445    #[test]
1446    fn test_vector_selection_updates_metrics() {
1447        let row_group_size = 4;
1448        let total_rows = 8;
1449        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1450        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1451        let mut metrics = ReaderFilterMetrics::default();
1452
1453        apply_selection_and_update_metrics(
1454            &mut output,
1455            &selection,
1456            &mut metrics,
1457            INDEX_TYPE_VECTOR,
1458        );
1459
1460        assert_eq!(metrics.rg_vector_filtered, 1);
1461        assert_eq!(metrics.rows_vector_filtered, 7);
1462        assert_eq!(output.row_count(), 1);
1463    }
1464}
1465
1466/// Metrics for parquet metadata cache operations.
1467#[derive(Default, Clone, Copy)]
1468pub(crate) struct MetadataCacheMetrics {
1469    /// Number of memory cache hits for parquet metadata.
1470    pub(crate) mem_cache_hit: usize,
1471    /// Number of file cache hits for parquet metadata.
1472    pub(crate) file_cache_hit: usize,
1473    /// Number of cache misses for parquet metadata.
1474    pub(crate) cache_miss: usize,
1475    /// Duration to load parquet metadata.
1476    pub(crate) metadata_load_cost: Duration,
1477    /// Number of read operations performed.
1478    pub(crate) num_reads: usize,
1479    /// Total bytes read from storage.
1480    pub(crate) bytes_read: u64,
1481}
1482
1483impl std::fmt::Debug for MetadataCacheMetrics {
1484    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1485        let Self {
1486            mem_cache_hit,
1487            file_cache_hit,
1488            cache_miss,
1489            metadata_load_cost,
1490            num_reads,
1491            bytes_read,
1492        } = self;
1493
1494        if self.is_empty() {
1495            return write!(f, "{{}}");
1496        }
1497        write!(f, "{{")?;
1498
1499        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1500
1501        if *mem_cache_hit > 0 {
1502            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1503        }
1504        if *file_cache_hit > 0 {
1505            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1506        }
1507        if *cache_miss > 0 {
1508            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1509        }
1510        if *num_reads > 0 {
1511            write!(f, ", \"num_reads\":{}", num_reads)?;
1512        }
1513        if *bytes_read > 0 {
1514            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1515        }
1516
1517        write!(f, "}}")
1518    }
1519}
1520
1521impl MetadataCacheMetrics {
1522    /// Returns true if the metrics are empty (contain no meaningful data).
1523    pub(crate) fn is_empty(&self) -> bool {
1524        self.metadata_load_cost.is_zero()
1525    }
1526
1527    /// Adds `other` metrics to this metrics.
1528    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1529        self.mem_cache_hit += other.mem_cache_hit;
1530        self.file_cache_hit += other.file_cache_hit;
1531        self.cache_miss += other.cache_miss;
1532        self.metadata_load_cost += other.metadata_load_cost;
1533        self.num_reads += other.num_reads;
1534        self.bytes_read += other.bytes_read;
1535    }
1536}
1537
1538/// Parquet reader metrics.
1539#[derive(Debug, Default, Clone)]
1540pub struct ReaderMetrics {
1541    /// Filtered row groups and rows metrics.
1542    pub(crate) filter_metrics: ReaderFilterMetrics,
1543    /// Duration to build the parquet reader.
1544    pub(crate) build_cost: Duration,
1545    /// Duration to scan the reader.
1546    pub(crate) scan_cost: Duration,
1547    /// Number of record batches read.
1548    pub(crate) num_record_batches: usize,
1549    /// Number of batches decoded.
1550    pub(crate) num_batches: usize,
1551    /// Number of rows read.
1552    pub(crate) num_rows: usize,
1553    /// Metrics for parquet metadata cache.
1554    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1555    /// Optional metrics for page/row group fetch operations.
1556    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1557    /// Memory size of metadata loaded for building file ranges.
1558    pub(crate) metadata_mem_size: isize,
1559    /// Number of file range builders created.
1560    pub(crate) num_range_builders: isize,
1561}
1562
1563impl ReaderMetrics {
1564    /// Adds `other` metrics to this metrics.
1565    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1566        self.filter_metrics.merge_from(&other.filter_metrics);
1567        self.build_cost += other.build_cost;
1568        self.scan_cost += other.scan_cost;
1569        self.num_record_batches += other.num_record_batches;
1570        self.num_batches += other.num_batches;
1571        self.num_rows += other.num_rows;
1572        self.metadata_cache_metrics
1573            .merge_from(&other.metadata_cache_metrics);
1574        if let Some(other_fetch) = &other.fetch_metrics {
1575            if let Some(self_fetch) = &self.fetch_metrics {
1576                self_fetch.merge_from(other_fetch);
1577            } else {
1578                self.fetch_metrics = Some(other_fetch.clone());
1579            }
1580        }
1581        self.metadata_mem_size += other.metadata_mem_size;
1582        self.num_range_builders += other.num_range_builders;
1583    }
1584
1585    /// Reports total rows.
1586    pub(crate) fn observe_rows(&self, read_type: &str) {
1587        READ_ROWS_TOTAL
1588            .with_label_values(&[read_type])
1589            .inc_by(self.num_rows as u64);
1590    }
1591}
1592
1593/// Builder to build a [ParquetRecordBatchReader] for a row group.
1594pub(crate) struct RowGroupReaderBuilder {
1595    /// SST file to read.
1596    ///
1597    /// Holds the file handle to avoid the file purge it.
1598    file_handle: FileHandle,
1599    /// Path of the file.
1600    file_path: String,
1601    /// Metadata of the parquet file.
1602    parquet_meta: Arc<ParquetMetaData>,
1603    /// Object store as an Operator.
1604    object_store: ObjectStore,
1605    /// Projection mask.
1606    projection: ProjectionMask,
1607    /// Field levels to read.
1608    field_levels: FieldLevels,
1609    /// Cache.
1610    cache_strategy: CacheStrategy,
1611}
1612
1613impl RowGroupReaderBuilder {
1614    /// Path of the file to read.
1615    pub(crate) fn file_path(&self) -> &str {
1616        &self.file_path
1617    }
1618
1619    /// Handle of the file to read.
1620    pub(crate) fn file_handle(&self) -> &FileHandle {
1621        &self.file_handle
1622    }
1623
1624    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1625        &self.parquet_meta
1626    }
1627
1628    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1629        &self.cache_strategy
1630    }
1631
1632    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
1633    pub(crate) async fn build(
1634        &self,
1635        row_group_idx: usize,
1636        row_selection: Option<RowSelection>,
1637        fetch_metrics: Option<&ParquetFetchMetrics>,
1638    ) -> Result<ParquetRecordBatchReader> {
1639        let fetch_start = Instant::now();
1640
1641        let mut row_group = InMemoryRowGroup::create(
1642            self.file_handle.region_id(),
1643            self.file_handle.file_id().file_id(),
1644            &self.parquet_meta,
1645            row_group_idx,
1646            self.cache_strategy.clone(),
1647            &self.file_path,
1648            self.object_store.clone(),
1649        );
1650        // Fetches data into memory.
1651        row_group
1652            .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1653            .await
1654            .context(ReadParquetSnafu {
1655                path: &self.file_path,
1656            })?;
1657
1658        // Record total fetch elapsed time.
1659        if let Some(metrics) = fetch_metrics {
1660            metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1661        }
1662
1663        // Builds the parquet reader.
1664        // Now the row selection is None.
1665        ParquetRecordBatchReader::try_new_with_row_groups(
1666            &self.field_levels,
1667            &row_group,
1668            DEFAULT_READ_BATCH_SIZE,
1669            row_selection,
1670        )
1671        .context(ReadParquetSnafu {
1672            path: &self.file_path,
1673        })
1674    }
1675}
1676
1677/// The state of a [ParquetReader].
1678enum ReaderState {
1679    /// The reader is reading a row group.
1680    Readable(PruneReader),
1681    /// The reader is exhausted.
1682    Exhausted(ReaderMetrics),
1683}
1684
1685impl ReaderState {
1686    /// Returns the metrics of the reader.
1687    fn metrics(&self) -> ReaderMetrics {
1688        match self {
1689            ReaderState::Readable(reader) => reader.metrics(),
1690            ReaderState::Exhausted(m) => m.clone(),
1691        }
1692    }
1693}
1694
1695/// The filter to evaluate or the prune result of the default value.
1696pub(crate) enum MaybeFilter {
1697    /// The filter to evaluate.
1698    Filter(SimpleFilterEvaluator),
1699    /// The filter matches the default value.
1700    Matched,
1701    /// The filter is pruned.
1702    Pruned,
1703}
1704
1705/// Context to evaluate the column filter for a parquet file.
1706pub(crate) struct SimpleFilterContext {
1707    /// Filter to evaluate.
1708    filter: MaybeFilter,
1709    /// Id of the column to evaluate.
1710    column_id: ColumnId,
1711    /// Semantic type of the column.
1712    semantic_type: SemanticType,
1713    /// The data type of the column.
1714    data_type: ConcreteDataType,
1715}
1716
1717impl SimpleFilterContext {
1718    /// Creates a context for the `expr`.
1719    ///
1720    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1721    /// expected metadata.
1722    pub(crate) fn new_opt(
1723        sst_meta: &RegionMetadataRef,
1724        expected_meta: Option<&RegionMetadata>,
1725        expr: &Expr,
1726    ) -> Option<Self> {
1727        let filter = SimpleFilterEvaluator::try_new(expr)?;
1728        let (column_metadata, maybe_filter) = match expected_meta {
1729            Some(meta) => {
1730                // Gets the column metadata from the expected metadata.
1731                let column = meta.column_by_name(filter.column_name())?;
1732                // Checks if the column is present in the SST metadata. We still uses the
1733                // column from the expected metadata.
1734                match sst_meta.column_by_id(column.column_id) {
1735                    Some(sst_column) => {
1736                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1737
1738                        (column, MaybeFilter::Filter(filter))
1739                    }
1740                    None => {
1741                        // If the column is not present in the SST metadata, we evaluate the filter
1742                        // against the default value of the column.
1743                        // If we can't evaluate the filter, we return None.
1744                        if pruned_by_default(&filter, column)? {
1745                            (column, MaybeFilter::Pruned)
1746                        } else {
1747                            (column, MaybeFilter::Matched)
1748                        }
1749                    }
1750                }
1751            }
1752            None => {
1753                let column = sst_meta.column_by_name(filter.column_name())?;
1754                (column, MaybeFilter::Filter(filter))
1755            }
1756        };
1757
1758        Some(Self {
1759            filter: maybe_filter,
1760            column_id: column_metadata.column_id,
1761            semantic_type: column_metadata.semantic_type,
1762            data_type: column_metadata.column_schema.data_type.clone(),
1763        })
1764    }
1765
1766    /// Returns the filter to evaluate.
1767    pub(crate) fn filter(&self) -> &MaybeFilter {
1768        &self.filter
1769    }
1770
1771    /// Returns the column id.
1772    pub(crate) fn column_id(&self) -> ColumnId {
1773        self.column_id
1774    }
1775
1776    /// Returns the semantic type of the column.
1777    pub(crate) fn semantic_type(&self) -> SemanticType {
1778        self.semantic_type
1779    }
1780
1781    /// Returns the data type of the column.
1782    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1783        &self.data_type
1784    }
1785}
1786
1787/// Prune a column by its default value.
1788/// Returns false if we can't create the default value or evaluate the filter.
1789fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1790    let value = column.column_schema.create_default().ok().flatten()?;
1791    let scalar_value = value
1792        .try_to_scalar_value(&column.column_schema.data_type)
1793        .ok()?;
1794    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1795    Some(!matches)
1796}
1797
1798/// Parquet batch reader to read our SST format.
1799pub struct ParquetReader {
1800    /// File range context.
1801    context: FileRangeContextRef,
1802    /// Row group selection to read.
1803    selection: RowGroupSelection,
1804    /// Reader of current row group.
1805    reader_state: ReaderState,
1806    /// Metrics for tracking row group fetch operations.
1807    fetch_metrics: ParquetFetchMetrics,
1808}
1809
1810#[async_trait]
1811impl BatchReader for ParquetReader {
1812    #[tracing::instrument(
1813        skip_all,
1814        fields(
1815            region_id = %self.context.reader_builder().file_handle.region_id(),
1816            file_id = %self.context.reader_builder().file_handle.file_id()
1817        )
1818    )]
1819    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1820        let ReaderState::Readable(reader) = &mut self.reader_state else {
1821            return Ok(None);
1822        };
1823
1824        // We don't collect the elapsed time if the reader returns an error.
1825        if let Some(batch) = reader.next_batch().await? {
1826            return Ok(Some(batch));
1827        }
1828
1829        // No more items in current row group, reads next row group.
1830        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1831            let parquet_reader = self
1832                .context
1833                .reader_builder()
1834                .build(
1835                    row_group_idx,
1836                    Some(row_selection),
1837                    Some(&self.fetch_metrics),
1838                )
1839                .await?;
1840
1841            // Resets the parquet reader.
1842            // Compute skip_fields for this row group
1843            let skip_fields = self.context.should_skip_fields(row_group_idx);
1844            reader.reset_source(
1845                Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1846                skip_fields,
1847            );
1848            if let Some(batch) = reader.next_batch().await? {
1849                return Ok(Some(batch));
1850            }
1851        }
1852
1853        // The reader is exhausted.
1854        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1855        Ok(None)
1856    }
1857}
1858
1859impl Drop for ParquetReader {
1860    fn drop(&mut self) {
1861        let metrics = self.reader_state.metrics();
1862        debug!(
1863            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1864            self.context.reader_builder().file_handle.region_id(),
1865            self.context.reader_builder().file_handle.file_id(),
1866            self.context.reader_builder().file_handle.time_range(),
1867            metrics.filter_metrics.rg_total
1868                - metrics.filter_metrics.rg_inverted_filtered
1869                - metrics.filter_metrics.rg_minmax_filtered
1870                - metrics.filter_metrics.rg_fulltext_filtered
1871                - metrics.filter_metrics.rg_bloom_filtered,
1872            metrics.filter_metrics.rg_total,
1873            metrics
1874        );
1875
1876        // Report metrics.
1877        READ_STAGE_ELAPSED
1878            .with_label_values(&["build_parquet_reader"])
1879            .observe(metrics.build_cost.as_secs_f64());
1880        READ_STAGE_ELAPSED
1881            .with_label_values(&["scan_row_groups"])
1882            .observe(metrics.scan_cost.as_secs_f64());
1883        metrics.observe_rows("parquet_reader");
1884        metrics.filter_metrics.observe();
1885    }
1886}
1887
1888impl ParquetReader {
1889    /// Creates a new reader.
1890    #[tracing::instrument(
1891        skip_all,
1892        fields(
1893            region_id = %context.reader_builder().file_handle.region_id(),
1894            file_id = %context.reader_builder().file_handle.file_id()
1895        )
1896    )]
1897    pub(crate) async fn new(
1898        context: FileRangeContextRef,
1899        mut selection: RowGroupSelection,
1900    ) -> Result<Self> {
1901        let fetch_metrics = ParquetFetchMetrics::default();
1902        // No more items in current row group, reads next row group.
1903        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1904            let parquet_reader = context
1905                .reader_builder()
1906                .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1907                .await?;
1908            // Compute skip_fields once for this row group
1909            let skip_fields = context.should_skip_fields(row_group_idx);
1910            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1911                context.clone(),
1912                RowGroupReader::new(context.clone(), parquet_reader),
1913                skip_fields,
1914            ))
1915        } else {
1916            ReaderState::Exhausted(ReaderMetrics::default())
1917        };
1918
1919        Ok(ParquetReader {
1920            context,
1921            selection,
1922            reader_state,
1923            fetch_metrics,
1924        })
1925    }
1926
1927    /// Returns the metadata of the SST.
1928    pub fn metadata(&self) -> &RegionMetadataRef {
1929        self.context.read_format().metadata()
1930    }
1931
1932    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1933        self.context.reader_builder().parquet_meta.clone()
1934    }
1935}
1936
1937/// RowGroupReaderContext represents the fields that cannot be shared
1938/// between different `RowGroupReader`s.
1939pub(crate) trait RowGroupReaderContext: Send {
1940    fn map_result(
1941        &self,
1942        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1943    ) -> Result<Option<RecordBatch>>;
1944
1945    fn read_format(&self) -> &ReadFormat;
1946}
1947
1948impl RowGroupReaderContext for FileRangeContextRef {
1949    fn map_result(
1950        &self,
1951        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1952    ) -> Result<Option<RecordBatch>> {
1953        result.context(ArrowReaderSnafu {
1954            path: self.file_path(),
1955        })
1956    }
1957
1958    fn read_format(&self) -> &ReadFormat {
1959        self.as_ref().read_format()
1960    }
1961}
1962
1963/// [RowGroupReader] that reads from [FileRange].
1964pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1965
1966impl RowGroupReader {
1967    /// Creates a new reader from file range.
1968    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1969        Self::create(context, reader)
1970    }
1971}
1972
1973/// Reader to read a row group of a parquet file.
1974pub(crate) struct RowGroupReaderBase<T> {
1975    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1976    context: T,
1977    /// Inner parquet reader.
1978    reader: ParquetRecordBatchReader,
1979    /// Buffered batches to return.
1980    batches: VecDeque<Batch>,
1981    /// Local scan metrics.
1982    metrics: ReaderMetrics,
1983    /// Cached sequence array to override sequences.
1984    override_sequence: Option<ArrayRef>,
1985}
1986
1987impl<T> RowGroupReaderBase<T>
1988where
1989    T: RowGroupReaderContext,
1990{
1991    /// Creates a new reader to read the primary key format.
1992    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1993        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1994        let override_sequence = context
1995            .read_format()
1996            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1997        assert!(context.read_format().as_primary_key().is_some());
1998
1999        Self {
2000            context,
2001            reader,
2002            batches: VecDeque::new(),
2003            metrics: ReaderMetrics::default(),
2004            override_sequence,
2005        }
2006    }
2007
2008    /// Gets the metrics.
2009    pub(crate) fn metrics(&self) -> &ReaderMetrics {
2010        &self.metrics
2011    }
2012
2013    /// Gets [ReadFormat] of underlying reader.
2014    pub(crate) fn read_format(&self) -> &ReadFormat {
2015        self.context.read_format()
2016    }
2017
2018    /// Tries to fetch next [RecordBatch] from the reader.
2019    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2020        self.context.map_result(self.reader.next().transpose())
2021    }
2022
2023    /// Returns the next [Batch].
2024    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
2025        let scan_start = Instant::now();
2026        if let Some(batch) = self.batches.pop_front() {
2027            self.metrics.num_rows += batch.num_rows();
2028            self.metrics.scan_cost += scan_start.elapsed();
2029            return Ok(Some(batch));
2030        }
2031
2032        // We need to fetch next record batch and convert it to batches.
2033        while self.batches.is_empty() {
2034            let Some(record_batch) = self.fetch_next_record_batch()? else {
2035                self.metrics.scan_cost += scan_start.elapsed();
2036                return Ok(None);
2037            };
2038            self.metrics.num_record_batches += 1;
2039
2040            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
2041            self.context
2042                .read_format()
2043                .as_primary_key()
2044                .unwrap()
2045                .convert_record_batch(
2046                    &record_batch,
2047                    self.override_sequence.as_ref(),
2048                    &mut self.batches,
2049                )?;
2050            self.metrics.num_batches += self.batches.len();
2051        }
2052        let batch = self.batches.pop_front();
2053        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2054        self.metrics.scan_cost += scan_start.elapsed();
2055        Ok(batch)
2056    }
2057}
2058
2059#[async_trait::async_trait]
2060impl<T> BatchReader for RowGroupReaderBase<T>
2061where
2062    T: RowGroupReaderContext,
2063{
2064    async fn next_batch(&mut self) -> Result<Option<Batch>> {
2065        self.next_inner()
2066    }
2067}
2068
2069/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2070pub(crate) struct FlatRowGroupReader {
2071    /// Context for file ranges.
2072    context: FileRangeContextRef,
2073    /// Inner parquet reader.
2074    reader: ParquetRecordBatchReader,
2075    /// Cached sequence array to override sequences.
2076    override_sequence: Option<ArrayRef>,
2077}
2078
2079impl FlatRowGroupReader {
2080    /// Creates a new flat reader from file range.
2081    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
2082        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2083        let override_sequence = context
2084            .read_format()
2085            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2086
2087        Self {
2088            context,
2089            reader,
2090            override_sequence,
2091        }
2092    }
2093
2094    /// Returns the next RecordBatch.
2095    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2096        match self.reader.next() {
2097            Some(batch_result) => {
2098                let record_batch = batch_result.context(ArrowReaderSnafu {
2099                    path: self.context.file_path(),
2100                })?;
2101
2102                // Safety: Only flat format use FlatRowGroupReader.
2103                let flat_format = self.context.read_format().as_flat().unwrap();
2104                let record_batch =
2105                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2106                Ok(Some(record_batch))
2107            }
2108            None => Ok(None),
2109        }
2110    }
2111}