mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use async_trait::async_trait;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_telemetry::{debug, tracing, warn};
27use datafusion_expr::Expr;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow::datatypes::Field;
30use datatypes::arrow::error::ArrowError;
31use datatypes::arrow::record_batch::RecordBatch;
32use datatypes::data_type::ConcreteDataType;
33use datatypes::prelude::DataType;
34use mito_codec::row_converter::build_primary_key_codec;
35use object_store::ObjectStore;
36use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
37use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
38use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
39use partition::expr::PartitionExpr;
40use snafu::{OptionExt, ResultExt};
41use store_api::codec::PrimaryKeyEncoding;
42use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
43use store_api::region_request::PathType;
44use store_api::storage::{ColumnId, FileId};
45use table::predicate::Predicate;
46
47use crate::cache::CacheStrategy;
48use crate::cache::index::result_cache::PredicateKey;
49#[cfg(feature = "vector_index")]
50use crate::error::ApplyVectorIndexSnafu;
51use crate::error::{
52    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
53    ReadParquetSnafu, Result, SerializePartitionExprSnafu,
54};
55use crate::metrics::{
56    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
57    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
58};
59use crate::read::flat_projection::CompactionProjectionMapper;
60use crate::read::prune::{PruneReader, Source};
61use crate::read::{Batch, BatchReader};
62use crate::sst::file::FileHandle;
63use crate::sst::index::bloom_filter::applier::{
64    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
65};
66use crate::sst::index::fulltext_index::applier::{
67    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
68};
69use crate::sst::index::inverted_index::applier::{
70    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
71};
72#[cfg(feature = "vector_index")]
73use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
74use crate::sst::parquet::file_range::{
75    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
76    row_group_contains_delete,
77};
78use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
79use crate::sst::parquet::metadata::MetadataLoader;
80use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
81use crate::sst::parquet::row_selection::RowGroupSelection;
82use crate::sst::parquet::stats::RowGroupPruningStats;
83use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
84use crate::sst::tag_maybe_to_dictionary_field;
85
86const INDEX_TYPE_FULLTEXT: &str = "fulltext";
87const INDEX_TYPE_INVERTED: &str = "inverted";
88const INDEX_TYPE_BLOOM: &str = "bloom filter";
89const INDEX_TYPE_VECTOR: &str = "vector";
90
91macro_rules! handle_index_error {
92    ($err:expr, $file_handle:expr, $index_type:expr) => {
93        if cfg!(any(test, feature = "test")) {
94            panic!(
95                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
96                $index_type,
97                $file_handle.region_id(),
98                $file_handle.file_id(),
99                $err
100            );
101        } else {
102            warn!(
103                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
104                $index_type,
105                $file_handle.region_id(),
106                $file_handle.file_id()
107            );
108        }
109    };
110}
111
112/// Parquet SST reader builder.
113pub struct ParquetReaderBuilder {
114    /// SST directory.
115    table_dir: String,
116    /// Path type for generating file paths.
117    path_type: PathType,
118    file_handle: FileHandle,
119    object_store: ObjectStore,
120    /// Predicate to push down.
121    predicate: Option<Predicate>,
122    /// Metadata of columns to read.
123    ///
124    /// `None` reads all columns. Due to schema change, the projection
125    /// can contain columns not in the parquet file.
126    projection: Option<Vec<ColumnId>>,
127    /// Strategy to cache SST data.
128    cache_strategy: CacheStrategy,
129    /// Index appliers.
130    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
131    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
132    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
133    /// Vector index applier for KNN search.
134    #[cfg(feature = "vector_index")]
135    vector_index_applier: Option<VectorIndexApplierRef>,
136    /// Over-fetched k for vector index scan.
137    #[cfg(feature = "vector_index")]
138    vector_index_k: Option<usize>,
139    /// Expected metadata of the region while reading the SST.
140    /// This is usually the latest metadata of the region. The reader use
141    /// it get the correct column id of a column by name.
142    expected_metadata: Option<RegionMetadataRef>,
143    /// Whether to use flat format for reading.
144    flat_format: bool,
145    /// Whether this reader is for compaction.
146    compaction: bool,
147    /// Mode to pre-filter columns.
148    pre_filter_mode: PreFilterMode,
149    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
150    decode_primary_key_values: bool,
151    page_index_policy: PageIndexPolicy,
152}
153
154impl ParquetReaderBuilder {
155    /// Returns a new [ParquetReaderBuilder] to read specific SST.
156    pub fn new(
157        table_dir: String,
158        path_type: PathType,
159        file_handle: FileHandle,
160        object_store: ObjectStore,
161    ) -> ParquetReaderBuilder {
162        ParquetReaderBuilder {
163            table_dir,
164            path_type,
165            file_handle,
166            object_store,
167            predicate: None,
168            projection: None,
169            cache_strategy: CacheStrategy::Disabled,
170            inverted_index_appliers: [None, None],
171            bloom_filter_index_appliers: [None, None],
172            fulltext_index_appliers: [None, None],
173            #[cfg(feature = "vector_index")]
174            vector_index_applier: None,
175            #[cfg(feature = "vector_index")]
176            vector_index_k: None,
177            expected_metadata: None,
178            flat_format: false,
179            compaction: false,
180            pre_filter_mode: PreFilterMode::All,
181            decode_primary_key_values: false,
182            page_index_policy: Default::default(),
183        }
184    }
185
186    /// Attaches the predicate to the builder.
187    #[must_use]
188    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
189        self.predicate = predicate;
190        self
191    }
192
193    /// Attaches the projection to the builder.
194    ///
195    /// The reader only applies the projection to fields.
196    #[must_use]
197    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
198        self.projection = projection;
199        self
200    }
201
202    /// Attaches the cache to the builder.
203    #[must_use]
204    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
205        self.cache_strategy = cache;
206        self
207    }
208
209    /// Attaches the inverted index appliers to the builder.
210    #[must_use]
211    pub(crate) fn inverted_index_appliers(
212        mut self,
213        index_appliers: [Option<InvertedIndexApplierRef>; 2],
214    ) -> Self {
215        self.inverted_index_appliers = index_appliers;
216        self
217    }
218
219    /// Attaches the bloom filter index appliers to the builder.
220    #[must_use]
221    pub(crate) fn bloom_filter_index_appliers(
222        mut self,
223        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
224    ) -> Self {
225        self.bloom_filter_index_appliers = index_appliers;
226        self
227    }
228
229    /// Attaches the fulltext index appliers to the builder.
230    #[must_use]
231    pub(crate) fn fulltext_index_appliers(
232        mut self,
233        index_appliers: [Option<FulltextIndexApplierRef>; 2],
234    ) -> Self {
235        self.fulltext_index_appliers = index_appliers;
236        self
237    }
238
239    /// Attaches the vector index applier to the builder.
240    #[cfg(feature = "vector_index")]
241    #[must_use]
242    pub(crate) fn vector_index_applier(
243        mut self,
244        applier: Option<VectorIndexApplierRef>,
245        k: Option<usize>,
246    ) -> Self {
247        self.vector_index_applier = applier;
248        self.vector_index_k = k;
249        self
250    }
251
252    /// Attaches the expected metadata to the builder.
253    #[must_use]
254    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
255        self.expected_metadata = expected_metadata;
256        self
257    }
258
259    /// Sets the flat format flag.
260    #[must_use]
261    pub fn flat_format(mut self, flat_format: bool) -> Self {
262        self.flat_format = flat_format;
263        self
264    }
265
266    /// Sets the compaction flag.
267    #[must_use]
268    pub fn compaction(mut self, compaction: bool) -> Self {
269        self.compaction = compaction;
270        self
271    }
272
273    /// Sets the pre-filter mode.
274    #[must_use]
275    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
276        self.pre_filter_mode = pre_filter_mode;
277        self
278    }
279
280    /// Decodes primary key values eagerly when reading primary key format SSTs.
281    #[must_use]
282    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
283        self.decode_primary_key_values = decode;
284        self
285    }
286
287    #[must_use]
288    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
289        self.page_index_policy = page_index_policy;
290        self
291    }
292
293    /// Builds a [ParquetReader].
294    ///
295    /// This needs to perform IO operation.
296    #[tracing::instrument(
297        skip_all,
298        fields(
299            region_id = %self.file_handle.region_id(),
300            file_id = %self.file_handle.file_id()
301        )
302    )]
303    pub async fn build(&self) -> Result<ParquetReader> {
304        let mut metrics = ReaderMetrics::default();
305
306        let (context, selection) = self.build_reader_input(&mut metrics).await?;
307        ParquetReader::new(Arc::new(context), selection).await
308    }
309
310    /// Builds a [FileRangeContext] and collects row groups to read.
311    ///
312    /// This needs to perform IO operation.
313    #[tracing::instrument(
314        skip_all,
315        fields(
316            region_id = %self.file_handle.region_id(),
317            file_id = %self.file_handle.file_id()
318        )
319    )]
320    pub(crate) async fn build_reader_input(
321        &self,
322        metrics: &mut ReaderMetrics,
323    ) -> Result<(FileRangeContext, RowGroupSelection)> {
324        let start = Instant::now();
325
326        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
327        let file_size = self.file_handle.meta_ref().file_size;
328
329        // Loads parquet metadata of the file.
330        let (parquet_meta, cache_miss) = self
331            .read_parquet_metadata(
332                &file_path,
333                file_size,
334                &mut metrics.metadata_cache_metrics,
335                self.page_index_policy,
336            )
337            .await?;
338        // Decodes region metadata.
339        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
340        // Gets the metadata stored in the SST.
341        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
342        let region_partition_expr = self
343            .expected_metadata
344            .as_ref()
345            .and_then(|meta| meta.partition_expr.as_ref());
346        let (_, is_same_region_partition) = Self::is_same_region_partition(
347            region_partition_expr.as_ref().map(|expr| expr.as_str()),
348            self.file_handle.meta_ref().partition_expr.as_ref(),
349        )?;
350        // Skip auto convert when:
351        // - compaction is enabled
352        // - region partition expr is same with file partition expr (no need to auto convert)
353        let skip_auto_convert = self.compaction && is_same_region_partition;
354
355        // Build a compaction projection helper when:
356        // - compaction is enabled
357        // - region partition expr differs from file partition expr
358        // - flat format is enabled
359        // - primary key encoding is sparse
360        //
361        // This is applied after row-group filtering to align batches with flat output schema
362        // before compat handling.
363        let compaction_projection_mapper = if self.compaction
364            && !is_same_region_partition
365            && self.flat_format
366            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
367        {
368            Some(CompactionProjectionMapper::try_new(&region_meta)?)
369        } else {
370            None
371        };
372
373        let mut read_format = if let Some(column_ids) = &self.projection {
374            ReadFormat::new(
375                region_meta.clone(),
376                Some(column_ids),
377                self.flat_format,
378                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
379                &file_path,
380                skip_auto_convert,
381            )?
382        } else {
383            // Lists all column ids to read, we always use the expected metadata if possible.
384            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
385            let column_ids: Vec<_> = expected_meta
386                .column_metadatas
387                .iter()
388                .map(|col| col.column_id)
389                .collect();
390            ReadFormat::new(
391                region_meta.clone(),
392                Some(&column_ids),
393                self.flat_format,
394                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395                &file_path,
396                skip_auto_convert,
397            )?
398        };
399        if self.decode_primary_key_values {
400            read_format.set_decode_primary_key_values(true);
401        }
402        if need_override_sequence(&parquet_meta) {
403            read_format
404                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
405        }
406
407        // Computes the projection mask.
408        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
409        let indices = read_format.projection_indices();
410        // Now we assumes we don't have nested schemas.
411        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
412        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
413
414        // Computes the field levels.
415        let hint = Some(read_format.arrow_schema().fields());
416        let field_levels =
417            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
418                .context(ReadDataPartSnafu)?;
419        let selection = self
420            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
421            .await;
422
423        // Trigger background download if metadata had a cache miss and selection is not empty
424        if cache_miss && !selection.is_empty() {
425            use crate::cache::file_cache::{FileType, IndexKey};
426            let index_key = IndexKey::new(
427                self.file_handle.region_id(),
428                self.file_handle.file_id().file_id(),
429                FileType::Parquet,
430            );
431            self.cache_strategy.maybe_download_background(
432                index_key,
433                file_path.clone(),
434                self.object_store.clone(),
435                file_size,
436            );
437        }
438
439        let prune_schema = self
440            .expected_metadata
441            .as_ref()
442            .map(|meta| meta.schema.clone())
443            .unwrap_or_else(|| region_meta.schema.clone());
444
445        let reader_builder = RowGroupReaderBuilder {
446            file_handle: self.file_handle.clone(),
447            file_path,
448            parquet_meta,
449            object_store: self.object_store.clone(),
450            projection: projection_mask,
451            field_levels,
452            cache_strategy: self.cache_strategy.clone(),
453        };
454
455        let filters = if let Some(predicate) = &self.predicate {
456            predicate
457                .exprs()
458                .iter()
459                .filter_map(|expr| {
460                    SimpleFilterContext::new_opt(
461                        &region_meta,
462                        self.expected_metadata.as_deref(),
463                        expr,
464                    )
465                })
466                .collect::<Vec<_>>()
467        } else {
468            vec![]
469        };
470
471        let dyn_filters = if let Some(predicate) = &self.predicate {
472            predicate.dyn_filters().clone()
473        } else {
474            Arc::new(vec![])
475        };
476
477        let codec = build_primary_key_codec(read_format.metadata());
478
479        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
480
481        let context = FileRangeContext::new(
482            reader_builder,
483            RangeBase {
484                filters,
485                dyn_filters,
486                read_format,
487                expected_metadata: self.expected_metadata.clone(),
488                prune_schema,
489                codec,
490                compat_batch: None,
491                compaction_projection_mapper,
492                pre_filter_mode: self.pre_filter_mode,
493                partition_filter,
494            },
495        );
496
497        metrics.build_cost += start.elapsed();
498
499        Ok((context, selection))
500    }
501
502    fn is_same_region_partition(
503        region_partition_expr_str: Option<&str>,
504        file_partition_expr: Option<&PartitionExpr>,
505    ) -> Result<(Option<PartitionExpr>, bool)> {
506        let region_partition_expr = match region_partition_expr_str {
507            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
508            None => None,
509        };
510
511        let is_same = region_partition_expr.as_ref() == file_partition_expr;
512        Ok((region_partition_expr, is_same))
513    }
514
515    /// Compare partition expressions from expected metadata and file metadata,
516    /// and build a partition filter if they differ.
517    fn build_partition_filter(
518        &self,
519        read_format: &ReadFormat,
520        prune_schema: &Arc<datatypes::schema::Schema>,
521    ) -> Result<Option<PartitionFilterContext>> {
522        let region_partition_expr_str = self
523            .expected_metadata
524            .as_ref()
525            .and_then(|meta| meta.partition_expr.as_ref());
526        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
527
528        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
529            region_partition_expr_str.map(|s| s.as_str()),
530            file_partition_expr_ref,
531        )?;
532
533        if is_same_region_partition {
534            return Ok(None);
535        }
536
537        let Some(region_partition_expr) = region_partition_expr else {
538            return Ok(None);
539        };
540
541        // Collect columns referenced by the partition expression.
542        let mut referenced_columns = HashSet::new();
543        region_partition_expr.collect_column_names(&mut referenced_columns);
544
545        // Build a partition_schema containing only referenced columns.
546        let is_flat = read_format.as_flat().is_some();
547        let partition_schema = Arc::new(datatypes::schema::Schema::new(
548            prune_schema
549                .column_schemas()
550                .iter()
551                .filter(|col| referenced_columns.contains(&col.name))
552                .map(|col| {
553                    if is_flat
554                        && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
555                        && column_meta.semantic_type == SemanticType::Tag
556                        && col.data_type.is_string()
557                    {
558                        let field = Arc::new(Field::new(
559                            &col.name,
560                            col.data_type.as_arrow_type(),
561                            col.is_nullable(),
562                        ));
563                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
564                        let mut column = col.clone();
565                        column.data_type =
566                            ConcreteDataType::from_arrow_type(dict_field.data_type());
567                        return column;
568                    }
569
570                    col.clone()
571                })
572                .collect::<Vec<_>>(),
573        ));
574
575        let region_partition_physical_expr = region_partition_expr
576            .try_as_physical_expr(partition_schema.arrow_schema())
577            .context(SerializePartitionExprSnafu)?;
578
579        Ok(Some(PartitionFilterContext {
580            region_partition_physical_expr,
581            partition_schema,
582        }))
583    }
584
585    /// Decodes region metadata from key value.
586    fn get_region_metadata(
587        file_path: &str,
588        key_value_meta: Option<&Vec<KeyValue>>,
589    ) -> Result<RegionMetadata> {
590        let key_values = key_value_meta.context(InvalidParquetSnafu {
591            file: file_path,
592            reason: "missing key value meta",
593        })?;
594        let meta_value = key_values
595            .iter()
596            .find(|kv| kv.key == PARQUET_METADATA_KEY)
597            .with_context(|| InvalidParquetSnafu {
598                file: file_path,
599                reason: format!("key {} not found", PARQUET_METADATA_KEY),
600            })?;
601        let json = meta_value
602            .value
603            .as_ref()
604            .with_context(|| InvalidParquetSnafu {
605                file: file_path,
606                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
607            })?;
608
609        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
610    }
611
612    /// Reads parquet metadata of specific file.
613    /// Returns (metadata, cache_miss_flag).
614    async fn read_parquet_metadata(
615        &self,
616        file_path: &str,
617        file_size: u64,
618        cache_metrics: &mut MetadataCacheMetrics,
619        page_index_policy: PageIndexPolicy,
620    ) -> Result<(Arc<ParquetMetaData>, bool)> {
621        let start = Instant::now();
622        let _t = READ_STAGE_ELAPSED
623            .with_label_values(&["read_parquet_metadata"])
624            .start_timer();
625
626        let file_id = self.file_handle.file_id();
627        // Tries to get from cache with metrics tracking.
628        if let Some(metadata) = self
629            .cache_strategy
630            .get_parquet_meta_data(file_id, cache_metrics, page_index_policy)
631            .await
632        {
633            cache_metrics.metadata_load_cost += start.elapsed();
634            return Ok((metadata, false));
635        }
636
637        // Cache miss, load metadata directly.
638        let mut metadata_loader =
639            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
640        metadata_loader.with_page_index_policy(page_index_policy);
641        let metadata = metadata_loader.load(cache_metrics).await?;
642
643        let metadata = Arc::new(metadata);
644        // Cache the metadata.
645        self.cache_strategy
646            .put_parquet_meta_data(file_id, metadata.clone());
647
648        cache_metrics.metadata_load_cost += start.elapsed();
649        Ok((metadata, true))
650    }
651
652    /// Computes row groups to read, along with their respective row selections.
653    #[tracing::instrument(
654        skip_all,
655        fields(
656            region_id = %self.file_handle.region_id(),
657            file_id = %self.file_handle.file_id()
658        )
659    )]
660    async fn row_groups_to_read(
661        &self,
662        read_format: &ReadFormat,
663        parquet_meta: &ParquetMetaData,
664        metrics: &mut ReaderFilterMetrics,
665    ) -> RowGroupSelection {
666        let num_row_groups = parquet_meta.num_row_groups();
667        let num_rows = parquet_meta.file_metadata().num_rows();
668        if num_row_groups == 0 || num_rows == 0 {
669            return RowGroupSelection::default();
670        }
671
672        // Let's assume that the number of rows in the first row group
673        // can represent the `row_group_size` of the Parquet file.
674        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
675        if row_group_size == 0 {
676            return RowGroupSelection::default();
677        }
678
679        metrics.rg_total += num_row_groups;
680        metrics.rows_total += num_rows as usize;
681
682        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
683
684        // Compute skip_fields once for all pruning operations
685        let skip_fields = self.compute_skip_fields(parquet_meta);
686
687        self.prune_row_groups_by_minmax(
688            read_format,
689            parquet_meta,
690            &mut output,
691            metrics,
692            skip_fields,
693        );
694        if output.is_empty() {
695            return output;
696        }
697
698        let fulltext_filtered = self
699            .prune_row_groups_by_fulltext_index(
700                row_group_size,
701                num_row_groups,
702                &mut output,
703                metrics,
704                skip_fields,
705            )
706            .await;
707        if output.is_empty() {
708            return output;
709        }
710
711        self.prune_row_groups_by_inverted_index(
712            row_group_size,
713            num_row_groups,
714            &mut output,
715            metrics,
716            skip_fields,
717        )
718        .await;
719        if output.is_empty() {
720            return output;
721        }
722
723        self.prune_row_groups_by_bloom_filter(
724            row_group_size,
725            parquet_meta,
726            &mut output,
727            metrics,
728            skip_fields,
729        )
730        .await;
731        if output.is_empty() {
732            return output;
733        }
734
735        if !fulltext_filtered {
736            self.prune_row_groups_by_fulltext_bloom(
737                row_group_size,
738                parquet_meta,
739                &mut output,
740                metrics,
741                skip_fields,
742            )
743            .await;
744        }
745        #[cfg(feature = "vector_index")]
746        {
747            self.prune_row_groups_by_vector_index(
748                row_group_size,
749                num_row_groups,
750                &mut output,
751                metrics,
752            )
753            .await;
754            if output.is_empty() {
755                return output;
756            }
757        }
758        output
759    }
760
761    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
762    async fn prune_row_groups_by_fulltext_index(
763        &self,
764        row_group_size: usize,
765        num_row_groups: usize,
766        output: &mut RowGroupSelection,
767        metrics: &mut ReaderFilterMetrics,
768        skip_fields: bool,
769    ) -> bool {
770        if !self.file_handle.meta_ref().fulltext_index_available() {
771            return false;
772        }
773
774        let mut pruned = false;
775        // If skip_fields is true, only apply the first applier (for tags).
776        let appliers = if skip_fields {
777            &self.fulltext_index_appliers[..1]
778        } else {
779            &self.fulltext_index_appliers[..]
780        };
781        for index_applier in appliers.iter().flatten() {
782            let predicate_key = index_applier.predicate_key();
783            // Fast path: return early if the result is in the cache.
784            let cached = self
785                .cache_strategy
786                .index_result_cache()
787                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
788            if let Some(result) = cached.as_ref()
789                && all_required_row_groups_searched(output, result)
790            {
791                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
792                metrics.fulltext_index_cache_hit += 1;
793                pruned = true;
794                continue;
795            }
796
797            // Slow path: apply the index from the file.
798            metrics.fulltext_index_cache_miss += 1;
799            let file_size_hint = self.file_handle.meta_ref().index_file_size();
800            let apply_res = index_applier
801                .apply_fine(
802                    self.file_handle.index_id(),
803                    Some(file_size_hint),
804                    metrics.fulltext_index_apply_metrics.as_mut(),
805                )
806                .await;
807            let selection = match apply_res {
808                Ok(Some(res)) => {
809                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
810                }
811                Ok(None) => continue,
812                Err(err) => {
813                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
814                    continue;
815                }
816            };
817
818            self.apply_index_result_and_update_cache(
819                predicate_key,
820                self.file_handle.file_id().file_id(),
821                selection,
822                output,
823                metrics,
824                INDEX_TYPE_FULLTEXT,
825            );
826            pruned = true;
827        }
828        pruned
829    }
830
831    /// Applies index to prune row groups.
832    ///
833    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
834    /// as an escape route in case of index issues, and it can be used to test
835    /// the correctness of the index.
836    async fn prune_row_groups_by_inverted_index(
837        &self,
838        row_group_size: usize,
839        num_row_groups: usize,
840        output: &mut RowGroupSelection,
841        metrics: &mut ReaderFilterMetrics,
842        skip_fields: bool,
843    ) -> bool {
844        if !self.file_handle.meta_ref().inverted_index_available() {
845            return false;
846        }
847
848        let mut pruned = false;
849        // If skip_fields is true, only apply the first applier (for tags).
850        let appliers = if skip_fields {
851            &self.inverted_index_appliers[..1]
852        } else {
853            &self.inverted_index_appliers[..]
854        };
855        for index_applier in appliers.iter().flatten() {
856            let predicate_key = index_applier.predicate_key();
857            // Fast path: return early if the result is in the cache.
858            let cached = self
859                .cache_strategy
860                .index_result_cache()
861                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
862            if let Some(result) = cached.as_ref()
863                && all_required_row_groups_searched(output, result)
864            {
865                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
866                metrics.inverted_index_cache_hit += 1;
867                pruned = true;
868                continue;
869            }
870
871            // Slow path: apply the index from the file.
872            metrics.inverted_index_cache_miss += 1;
873            let file_size_hint = self.file_handle.meta_ref().index_file_size();
874            let apply_res = index_applier
875                .apply(
876                    self.file_handle.index_id(),
877                    Some(file_size_hint),
878                    metrics.inverted_index_apply_metrics.as_mut(),
879                )
880                .await;
881            let selection = match apply_res {
882                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
883                    row_group_size,
884                    num_row_groups,
885                    apply_output,
886                ),
887                Err(err) => {
888                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
889                    continue;
890                }
891            };
892
893            self.apply_index_result_and_update_cache(
894                predicate_key,
895                self.file_handle.file_id().file_id(),
896                selection,
897                output,
898                metrics,
899                INDEX_TYPE_INVERTED,
900            );
901            pruned = true;
902        }
903        pruned
904    }
905
906    async fn prune_row_groups_by_bloom_filter(
907        &self,
908        row_group_size: usize,
909        parquet_meta: &ParquetMetaData,
910        output: &mut RowGroupSelection,
911        metrics: &mut ReaderFilterMetrics,
912        skip_fields: bool,
913    ) -> bool {
914        if !self.file_handle.meta_ref().bloom_filter_index_available() {
915            return false;
916        }
917
918        let mut pruned = false;
919        // If skip_fields is true, only apply the first applier (for tags).
920        let appliers = if skip_fields {
921            &self.bloom_filter_index_appliers[..1]
922        } else {
923            &self.bloom_filter_index_appliers[..]
924        };
925        for index_applier in appliers.iter().flatten() {
926            let predicate_key = index_applier.predicate_key();
927            // Fast path: return early if the result is in the cache.
928            let cached = self
929                .cache_strategy
930                .index_result_cache()
931                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
932            if let Some(result) = cached.as_ref()
933                && all_required_row_groups_searched(output, result)
934            {
935                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
936                metrics.bloom_filter_cache_hit += 1;
937                pruned = true;
938                continue;
939            }
940
941            // Slow path: apply the index from the file.
942            metrics.bloom_filter_cache_miss += 1;
943            let file_size_hint = self.file_handle.meta_ref().index_file_size();
944            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
945                (
946                    rg.num_rows() as usize,
947                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
948                    output.contains_non_empty_row_group(i)
949                        && cached
950                            .as_ref()
951                            .map(|c| !c.contains_row_group(i))
952                            .unwrap_or(true),
953                )
954            });
955            let apply_res = index_applier
956                .apply(
957                    self.file_handle.index_id(),
958                    Some(file_size_hint),
959                    rgs,
960                    metrics.bloom_filter_apply_metrics.as_mut(),
961                )
962                .await;
963            let mut selection = match apply_res {
964                Ok(apply_output) => {
965                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
966                }
967                Err(err) => {
968                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
969                    continue;
970                }
971            };
972
973            // New searched row groups are added to `selection`, concat them with `cached`.
974            if let Some(cached) = cached.as_ref() {
975                selection.concat(cached);
976            }
977
978            self.apply_index_result_and_update_cache(
979                predicate_key,
980                self.file_handle.file_id().file_id(),
981                selection,
982                output,
983                metrics,
984                INDEX_TYPE_BLOOM,
985            );
986            pruned = true;
987        }
988        pruned
989    }
990
991    /// Prunes row groups by vector index results.
992    #[cfg(feature = "vector_index")]
993    async fn prune_row_groups_by_vector_index(
994        &self,
995        row_group_size: usize,
996        num_row_groups: usize,
997        output: &mut RowGroupSelection,
998        metrics: &mut ReaderFilterMetrics,
999    ) {
1000        let Some(applier) = &self.vector_index_applier else {
1001            return;
1002        };
1003        let Some(k) = self.vector_index_k else {
1004            return;
1005        };
1006        if !self.file_handle.meta_ref().vector_index_available() {
1007            return;
1008        }
1009
1010        let file_size_hint = self.file_handle.meta_ref().index_file_size();
1011        let apply_res = applier
1012            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1013            .await;
1014        let row_ids = match apply_res {
1015            Ok(res) => res.row_offsets,
1016            Err(err) => {
1017                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1018                return;
1019            }
1020        };
1021
1022        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1023        {
1024            Ok(selection) => selection,
1025            Err(err) => {
1026                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1027                return;
1028            }
1029        };
1030        metrics.rows_vector_selected += selection.row_count();
1031        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1032    }
1033
1034    async fn prune_row_groups_by_fulltext_bloom(
1035        &self,
1036        row_group_size: usize,
1037        parquet_meta: &ParquetMetaData,
1038        output: &mut RowGroupSelection,
1039        metrics: &mut ReaderFilterMetrics,
1040        skip_fields: bool,
1041    ) -> bool {
1042        if !self.file_handle.meta_ref().fulltext_index_available() {
1043            return false;
1044        }
1045
1046        let mut pruned = false;
1047        // If skip_fields is true, only apply the first applier (for tags).
1048        let appliers = if skip_fields {
1049            &self.fulltext_index_appliers[..1]
1050        } else {
1051            &self.fulltext_index_appliers[..]
1052        };
1053        for index_applier in appliers.iter().flatten() {
1054            let predicate_key = index_applier.predicate_key();
1055            // Fast path: return early if the result is in the cache.
1056            let cached = self
1057                .cache_strategy
1058                .index_result_cache()
1059                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1060            if let Some(result) = cached.as_ref()
1061                && all_required_row_groups_searched(output, result)
1062            {
1063                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1064                metrics.fulltext_index_cache_hit += 1;
1065                pruned = true;
1066                continue;
1067            }
1068
1069            // Slow path: apply the index from the file.
1070            metrics.fulltext_index_cache_miss += 1;
1071            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1072            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1073                (
1074                    rg.num_rows() as usize,
1075                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1076                    output.contains_non_empty_row_group(i)
1077                        && cached
1078                            .as_ref()
1079                            .map(|c| !c.contains_row_group(i))
1080                            .unwrap_or(true),
1081                )
1082            });
1083            let apply_res = index_applier
1084                .apply_coarse(
1085                    self.file_handle.index_id(),
1086                    Some(file_size_hint),
1087                    rgs,
1088                    metrics.fulltext_index_apply_metrics.as_mut(),
1089                )
1090                .await;
1091            let mut selection = match apply_res {
1092                Ok(Some(apply_output)) => {
1093                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1094                }
1095                Ok(None) => continue,
1096                Err(err) => {
1097                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1098                    continue;
1099                }
1100            };
1101
1102            // New searched row groups are added to `selection`, concat them with `cached`.
1103            if let Some(cached) = cached.as_ref() {
1104                selection.concat(cached);
1105            }
1106
1107            self.apply_index_result_and_update_cache(
1108                predicate_key,
1109                self.file_handle.file_id().file_id(),
1110                selection,
1111                output,
1112                metrics,
1113                INDEX_TYPE_FULLTEXT,
1114            );
1115            pruned = true;
1116        }
1117        pruned
1118    }
1119
1120    /// Computes whether to skip field columns when building statistics based on PreFilterMode.
1121    fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
1122        match self.pre_filter_mode {
1123            PreFilterMode::All => false,
1124            PreFilterMode::SkipFields => true,
1125            PreFilterMode::SkipFieldsOnDelete => {
1126                // Check if any row group contains delete op
1127                let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
1128                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1129                    row_group_contains_delete(parquet_meta, rg_idx, &file_path)
1130                        .inspect_err(|e| {
1131                            warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
1132                        })
1133                        .unwrap_or(false)
1134                })
1135            }
1136        }
1137    }
1138
1139    /// Prunes row groups by min-max index.
1140    fn prune_row_groups_by_minmax(
1141        &self,
1142        read_format: &ReadFormat,
1143        parquet_meta: &ParquetMetaData,
1144        output: &mut RowGroupSelection,
1145        metrics: &mut ReaderFilterMetrics,
1146        skip_fields: bool,
1147    ) -> bool {
1148        let Some(predicate) = &self.predicate else {
1149            return false;
1150        };
1151
1152        let row_groups_before = output.row_group_count();
1153
1154        let region_meta = read_format.metadata();
1155        let row_groups = parquet_meta.row_groups();
1156        let stats = RowGroupPruningStats::new(
1157            row_groups,
1158            read_format,
1159            self.expected_metadata.clone(),
1160            skip_fields,
1161        );
1162        let prune_schema = self
1163            .expected_metadata
1164            .as_ref()
1165            .map(|meta| meta.schema.arrow_schema())
1166            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1167
1168        // Here we use the schema of the SST to build the physical expression. If the column
1169        // in the SST doesn't have the same column id as the column in the expected metadata,
1170        // we will get a None statistics for that column.
1171        predicate
1172            .prune_with_stats(&stats, prune_schema)
1173            .iter()
1174            .zip(0..parquet_meta.num_row_groups())
1175            .for_each(|(mask, row_group)| {
1176                if !*mask {
1177                    output.remove_row_group(row_group);
1178                }
1179            });
1180
1181        let row_groups_after = output.row_group_count();
1182        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
1183
1184        true
1185    }
1186
1187    fn apply_index_result_and_update_cache(
1188        &self,
1189        predicate_key: &PredicateKey,
1190        file_id: FileId,
1191        result: RowGroupSelection,
1192        output: &mut RowGroupSelection,
1193        metrics: &mut ReaderFilterMetrics,
1194        index_type: &str,
1195    ) {
1196        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1197
1198        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1199            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1200        }
1201    }
1202}
1203
1204fn apply_selection_and_update_metrics(
1205    output: &mut RowGroupSelection,
1206    result: &RowGroupSelection,
1207    metrics: &mut ReaderFilterMetrics,
1208    index_type: &str,
1209) {
1210    let intersection = output.intersect(result);
1211
1212    let row_group_count = output.row_group_count() - intersection.row_group_count();
1213    let row_count = output.row_count() - intersection.row_count();
1214
1215    metrics.update_index_metrics(index_type, row_group_count, row_count);
1216
1217    *output = intersection;
1218}
1219
1220#[cfg(feature = "vector_index")]
1221fn vector_selection_from_offsets(
1222    row_offsets: Vec<u64>,
1223    row_group_size: usize,
1224    num_row_groups: usize,
1225) -> Result<RowGroupSelection> {
1226    let mut row_ids = BTreeSet::new();
1227    for offset in row_offsets {
1228        let row_id = u32::try_from(offset).map_err(|_| {
1229            ApplyVectorIndexSnafu {
1230                reason: format!("Row offset {} exceeds u32::MAX", offset),
1231            }
1232            .build()
1233        })?;
1234        row_ids.insert(row_id);
1235    }
1236    Ok(RowGroupSelection::from_row_ids(
1237        row_ids,
1238        row_group_size,
1239        num_row_groups,
1240    ))
1241}
1242
1243fn all_required_row_groups_searched(
1244    required_row_groups: &RowGroupSelection,
1245    cached_row_groups: &RowGroupSelection,
1246) -> bool {
1247    required_row_groups.iter().all(|(rg_id, _)| {
1248        // Row group with no rows is not required to search.
1249        !required_row_groups.contains_non_empty_row_group(*rg_id)
1250            // The row group is already searched.
1251            || cached_row_groups.contains_row_group(*rg_id)
1252    })
1253}
1254
1255/// Metrics of filtering rows groups and rows.
1256#[derive(Debug, Default, Clone)]
1257pub(crate) struct ReaderFilterMetrics {
1258    /// Number of row groups before filtering.
1259    pub(crate) rg_total: usize,
1260    /// Number of row groups filtered by fulltext index.
1261    pub(crate) rg_fulltext_filtered: usize,
1262    /// Number of row groups filtered by inverted index.
1263    pub(crate) rg_inverted_filtered: usize,
1264    /// Number of row groups filtered by min-max index.
1265    pub(crate) rg_minmax_filtered: usize,
1266    /// Number of row groups filtered by bloom filter index.
1267    pub(crate) rg_bloom_filtered: usize,
1268    /// Number of row groups filtered by vector index.
1269    pub(crate) rg_vector_filtered: usize,
1270
1271    /// Number of rows in row group before filtering.
1272    pub(crate) rows_total: usize,
1273    /// Number of rows in row group filtered by fulltext index.
1274    pub(crate) rows_fulltext_filtered: usize,
1275    /// Number of rows in row group filtered by inverted index.
1276    pub(crate) rows_inverted_filtered: usize,
1277    /// Number of rows in row group filtered by bloom filter index.
1278    pub(crate) rows_bloom_filtered: usize,
1279    /// Number of rows filtered by vector index.
1280    pub(crate) rows_vector_filtered: usize,
1281    /// Number of rows selected by vector index.
1282    pub(crate) rows_vector_selected: usize,
1283    /// Number of rows filtered by precise filter.
1284    pub(crate) rows_precise_filtered: usize,
1285
1286    /// Number of index result cache hits for fulltext index.
1287    pub(crate) fulltext_index_cache_hit: usize,
1288    /// Number of index result cache misses for fulltext index.
1289    pub(crate) fulltext_index_cache_miss: usize,
1290    /// Number of index result cache hits for inverted index.
1291    pub(crate) inverted_index_cache_hit: usize,
1292    /// Number of index result cache misses for inverted index.
1293    pub(crate) inverted_index_cache_miss: usize,
1294    /// Number of index result cache hits for bloom filter index.
1295    pub(crate) bloom_filter_cache_hit: usize,
1296    /// Number of index result cache misses for bloom filter index.
1297    pub(crate) bloom_filter_cache_miss: usize,
1298
1299    /// Optional metrics for inverted index applier.
1300    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1301    /// Optional metrics for bloom filter index applier.
1302    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1303    /// Optional metrics for fulltext index applier.
1304    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1305
1306    /// Number of pruner builder cache hits.
1307    pub(crate) pruner_cache_hit: usize,
1308    /// Number of pruner builder cache misses.
1309    pub(crate) pruner_cache_miss: usize,
1310    /// Duration spent waiting for pruner to build file ranges.
1311    pub(crate) pruner_prune_cost: Duration,
1312}
1313
1314impl ReaderFilterMetrics {
1315    /// Adds `other` metrics to this metrics.
1316    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1317        self.rg_total += other.rg_total;
1318        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1319        self.rg_inverted_filtered += other.rg_inverted_filtered;
1320        self.rg_minmax_filtered += other.rg_minmax_filtered;
1321        self.rg_bloom_filtered += other.rg_bloom_filtered;
1322        self.rg_vector_filtered += other.rg_vector_filtered;
1323
1324        self.rows_total += other.rows_total;
1325        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1326        self.rows_inverted_filtered += other.rows_inverted_filtered;
1327        self.rows_bloom_filtered += other.rows_bloom_filtered;
1328        self.rows_vector_filtered += other.rows_vector_filtered;
1329        self.rows_vector_selected += other.rows_vector_selected;
1330        self.rows_precise_filtered += other.rows_precise_filtered;
1331
1332        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1333        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1334        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1335        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1336        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1337        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1338
1339        self.pruner_cache_hit += other.pruner_cache_hit;
1340        self.pruner_cache_miss += other.pruner_cache_miss;
1341        self.pruner_prune_cost += other.pruner_prune_cost;
1342
1343        // Merge optional applier metrics
1344        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1345            self.inverted_index_apply_metrics
1346                .get_or_insert_with(Default::default)
1347                .merge_from(other_metrics);
1348        }
1349        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1350            self.bloom_filter_apply_metrics
1351                .get_or_insert_with(Default::default)
1352                .merge_from(other_metrics);
1353        }
1354        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1355            self.fulltext_index_apply_metrics
1356                .get_or_insert_with(Default::default)
1357                .merge_from(other_metrics);
1358        }
1359    }
1360
1361    /// Reports metrics.
1362    pub(crate) fn observe(&self) {
1363        READ_ROW_GROUPS_TOTAL
1364            .with_label_values(&["before_filtering"])
1365            .inc_by(self.rg_total as u64);
1366        READ_ROW_GROUPS_TOTAL
1367            .with_label_values(&["fulltext_index_filtered"])
1368            .inc_by(self.rg_fulltext_filtered as u64);
1369        READ_ROW_GROUPS_TOTAL
1370            .with_label_values(&["inverted_index_filtered"])
1371            .inc_by(self.rg_inverted_filtered as u64);
1372        READ_ROW_GROUPS_TOTAL
1373            .with_label_values(&["minmax_index_filtered"])
1374            .inc_by(self.rg_minmax_filtered as u64);
1375        READ_ROW_GROUPS_TOTAL
1376            .with_label_values(&["bloom_filter_index_filtered"])
1377            .inc_by(self.rg_bloom_filtered as u64);
1378        READ_ROW_GROUPS_TOTAL
1379            .with_label_values(&["vector_index_filtered"])
1380            .inc_by(self.rg_vector_filtered as u64);
1381
1382        PRECISE_FILTER_ROWS_TOTAL
1383            .with_label_values(&["parquet"])
1384            .inc_by(self.rows_precise_filtered as u64);
1385        READ_ROWS_IN_ROW_GROUP_TOTAL
1386            .with_label_values(&["before_filtering"])
1387            .inc_by(self.rows_total as u64);
1388        READ_ROWS_IN_ROW_GROUP_TOTAL
1389            .with_label_values(&["fulltext_index_filtered"])
1390            .inc_by(self.rows_fulltext_filtered as u64);
1391        READ_ROWS_IN_ROW_GROUP_TOTAL
1392            .with_label_values(&["inverted_index_filtered"])
1393            .inc_by(self.rows_inverted_filtered as u64);
1394        READ_ROWS_IN_ROW_GROUP_TOTAL
1395            .with_label_values(&["bloom_filter_index_filtered"])
1396            .inc_by(self.rows_bloom_filtered as u64);
1397        READ_ROWS_IN_ROW_GROUP_TOTAL
1398            .with_label_values(&["vector_index_filtered"])
1399            .inc_by(self.rows_vector_filtered as u64);
1400    }
1401
1402    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1403        match index_type {
1404            INDEX_TYPE_FULLTEXT => {
1405                self.rg_fulltext_filtered += row_group_count;
1406                self.rows_fulltext_filtered += row_count;
1407            }
1408            INDEX_TYPE_INVERTED => {
1409                self.rg_inverted_filtered += row_group_count;
1410                self.rows_inverted_filtered += row_count;
1411            }
1412            INDEX_TYPE_BLOOM => {
1413                self.rg_bloom_filtered += row_group_count;
1414                self.rows_bloom_filtered += row_count;
1415            }
1416            INDEX_TYPE_VECTOR => {
1417                self.rg_vector_filtered += row_group_count;
1418                self.rows_vector_filtered += row_count;
1419            }
1420            _ => {}
1421        }
1422    }
1423}
1424
1425#[cfg(all(test, feature = "vector_index"))]
1426mod tests {
1427    use super::*;
1428
1429    #[test]
1430    fn test_vector_selection_from_offsets() {
1431        let row_group_size = 4;
1432        let num_row_groups = 3;
1433        let selection =
1434            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1435                .unwrap();
1436
1437        assert_eq!(selection.row_group_count(), 3);
1438        assert_eq!(selection.row_count(), 4);
1439        assert!(selection.contains_non_empty_row_group(0));
1440        assert!(selection.contains_non_empty_row_group(1));
1441        assert!(selection.contains_non_empty_row_group(2));
1442    }
1443
1444    #[test]
1445    fn test_vector_selection_from_offsets_out_of_range() {
1446        let row_group_size = 4;
1447        let num_row_groups = 2;
1448        let selection = vector_selection_from_offsets(
1449            vec![0, 7, u64::from(u32::MAX) + 1],
1450            row_group_size,
1451            num_row_groups,
1452        );
1453        assert!(selection.is_err());
1454    }
1455
1456    #[test]
1457    fn test_vector_selection_updates_metrics() {
1458        let row_group_size = 4;
1459        let total_rows = 8;
1460        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1461        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1462        let mut metrics = ReaderFilterMetrics::default();
1463
1464        apply_selection_and_update_metrics(
1465            &mut output,
1466            &selection,
1467            &mut metrics,
1468            INDEX_TYPE_VECTOR,
1469        );
1470
1471        assert_eq!(metrics.rg_vector_filtered, 1);
1472        assert_eq!(metrics.rows_vector_filtered, 7);
1473        assert_eq!(output.row_count(), 1);
1474    }
1475}
1476
1477/// Metrics for parquet metadata cache operations.
1478#[derive(Default, Clone, Copy)]
1479pub(crate) struct MetadataCacheMetrics {
1480    /// Number of memory cache hits for parquet metadata.
1481    pub(crate) mem_cache_hit: usize,
1482    /// Number of file cache hits for parquet metadata.
1483    pub(crate) file_cache_hit: usize,
1484    /// Number of cache misses for parquet metadata.
1485    pub(crate) cache_miss: usize,
1486    /// Duration to load parquet metadata.
1487    pub(crate) metadata_load_cost: Duration,
1488    /// Number of read operations performed.
1489    pub(crate) num_reads: usize,
1490    /// Total bytes read from storage.
1491    pub(crate) bytes_read: u64,
1492}
1493
1494impl std::fmt::Debug for MetadataCacheMetrics {
1495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1496        let Self {
1497            mem_cache_hit,
1498            file_cache_hit,
1499            cache_miss,
1500            metadata_load_cost,
1501            num_reads,
1502            bytes_read,
1503        } = self;
1504
1505        if self.is_empty() {
1506            return write!(f, "{{}}");
1507        }
1508        write!(f, "{{")?;
1509
1510        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1511
1512        if *mem_cache_hit > 0 {
1513            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1514        }
1515        if *file_cache_hit > 0 {
1516            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1517        }
1518        if *cache_miss > 0 {
1519            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1520        }
1521        if *num_reads > 0 {
1522            write!(f, ", \"num_reads\":{}", num_reads)?;
1523        }
1524        if *bytes_read > 0 {
1525            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1526        }
1527
1528        write!(f, "}}")
1529    }
1530}
1531
1532impl MetadataCacheMetrics {
1533    /// Returns true if the metrics are empty (contain no meaningful data).
1534    pub(crate) fn is_empty(&self) -> bool {
1535        self.metadata_load_cost.is_zero()
1536    }
1537
1538    /// Adds `other` metrics to this metrics.
1539    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1540        self.mem_cache_hit += other.mem_cache_hit;
1541        self.file_cache_hit += other.file_cache_hit;
1542        self.cache_miss += other.cache_miss;
1543        self.metadata_load_cost += other.metadata_load_cost;
1544        self.num_reads += other.num_reads;
1545        self.bytes_read += other.bytes_read;
1546    }
1547}
1548
1549/// Parquet reader metrics.
1550#[derive(Debug, Default, Clone)]
1551pub struct ReaderMetrics {
1552    /// Filtered row groups and rows metrics.
1553    pub(crate) filter_metrics: ReaderFilterMetrics,
1554    /// Duration to build the parquet reader.
1555    pub(crate) build_cost: Duration,
1556    /// Duration to scan the reader.
1557    pub(crate) scan_cost: Duration,
1558    /// Number of record batches read.
1559    pub(crate) num_record_batches: usize,
1560    /// Number of batches decoded.
1561    pub(crate) num_batches: usize,
1562    /// Number of rows read.
1563    pub(crate) num_rows: usize,
1564    /// Metrics for parquet metadata cache.
1565    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1566    /// Optional metrics for page/row group fetch operations.
1567    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1568    /// Memory size of metadata loaded for building file ranges.
1569    pub(crate) metadata_mem_size: isize,
1570    /// Number of file range builders created.
1571    pub(crate) num_range_builders: isize,
1572}
1573
1574impl ReaderMetrics {
1575    /// Adds `other` metrics to this metrics.
1576    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1577        self.filter_metrics.merge_from(&other.filter_metrics);
1578        self.build_cost += other.build_cost;
1579        self.scan_cost += other.scan_cost;
1580        self.num_record_batches += other.num_record_batches;
1581        self.num_batches += other.num_batches;
1582        self.num_rows += other.num_rows;
1583        self.metadata_cache_metrics
1584            .merge_from(&other.metadata_cache_metrics);
1585        if let Some(other_fetch) = &other.fetch_metrics {
1586            if let Some(self_fetch) = &self.fetch_metrics {
1587                self_fetch.merge_from(other_fetch);
1588            } else {
1589                self.fetch_metrics = Some(other_fetch.clone());
1590            }
1591        }
1592        self.metadata_mem_size += other.metadata_mem_size;
1593        self.num_range_builders += other.num_range_builders;
1594    }
1595
1596    /// Reports total rows.
1597    pub(crate) fn observe_rows(&self, read_type: &str) {
1598        READ_ROWS_TOTAL
1599            .with_label_values(&[read_type])
1600            .inc_by(self.num_rows as u64);
1601    }
1602}
1603
1604/// Builder to build a [ParquetRecordBatchReader] for a row group.
1605pub(crate) struct RowGroupReaderBuilder {
1606    /// SST file to read.
1607    ///
1608    /// Holds the file handle to avoid the file purge it.
1609    file_handle: FileHandle,
1610    /// Path of the file.
1611    file_path: String,
1612    /// Metadata of the parquet file.
1613    parquet_meta: Arc<ParquetMetaData>,
1614    /// Object store as an Operator.
1615    object_store: ObjectStore,
1616    /// Projection mask.
1617    projection: ProjectionMask,
1618    /// Field levels to read.
1619    field_levels: FieldLevels,
1620    /// Cache.
1621    cache_strategy: CacheStrategy,
1622}
1623
1624impl RowGroupReaderBuilder {
1625    /// Path of the file to read.
1626    pub(crate) fn file_path(&self) -> &str {
1627        &self.file_path
1628    }
1629
1630    /// Handle of the file to read.
1631    pub(crate) fn file_handle(&self) -> &FileHandle {
1632        &self.file_handle
1633    }
1634
1635    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1636        &self.parquet_meta
1637    }
1638
1639    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1640        &self.cache_strategy
1641    }
1642
1643    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
1644    pub(crate) async fn build(
1645        &self,
1646        row_group_idx: usize,
1647        row_selection: Option<RowSelection>,
1648        fetch_metrics: Option<&ParquetFetchMetrics>,
1649    ) -> Result<ParquetRecordBatchReader> {
1650        let fetch_start = Instant::now();
1651
1652        let mut row_group = InMemoryRowGroup::create(
1653            self.file_handle.region_id(),
1654            self.file_handle.file_id().file_id(),
1655            &self.parquet_meta,
1656            row_group_idx,
1657            self.cache_strategy.clone(),
1658            &self.file_path,
1659            self.object_store.clone(),
1660        );
1661        // Fetches data into memory.
1662        row_group
1663            .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1664            .await
1665            .context(ReadParquetSnafu {
1666                path: &self.file_path,
1667            })?;
1668
1669        // Record total fetch elapsed time.
1670        if let Some(metrics) = fetch_metrics {
1671            metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1672        }
1673
1674        // Builds the parquet reader.
1675        // Now the row selection is None.
1676        ParquetRecordBatchReader::try_new_with_row_groups(
1677            &self.field_levels,
1678            &row_group,
1679            DEFAULT_READ_BATCH_SIZE,
1680            row_selection,
1681        )
1682        .context(ReadParquetSnafu {
1683            path: &self.file_path,
1684        })
1685    }
1686}
1687
1688/// The state of a [ParquetReader].
1689enum ReaderState {
1690    /// The reader is reading a row group.
1691    Readable(PruneReader),
1692    /// The reader is exhausted.
1693    Exhausted(ReaderMetrics),
1694}
1695
1696impl ReaderState {
1697    /// Returns the metrics of the reader.
1698    fn metrics(&self) -> ReaderMetrics {
1699        match self {
1700            ReaderState::Readable(reader) => reader.metrics(),
1701            ReaderState::Exhausted(m) => m.clone(),
1702        }
1703    }
1704}
1705
1706/// The filter to evaluate or the prune result of the default value.
1707pub(crate) enum MaybeFilter {
1708    /// The filter to evaluate.
1709    Filter(SimpleFilterEvaluator),
1710    /// The filter matches the default value.
1711    Matched,
1712    /// The filter is pruned.
1713    Pruned,
1714}
1715
1716/// Context to evaluate the column filter for a parquet file.
1717pub(crate) struct SimpleFilterContext {
1718    /// Filter to evaluate.
1719    filter: MaybeFilter,
1720    /// Id of the column to evaluate.
1721    column_id: ColumnId,
1722    /// Semantic type of the column.
1723    semantic_type: SemanticType,
1724    /// The data type of the column.
1725    data_type: ConcreteDataType,
1726}
1727
1728impl SimpleFilterContext {
1729    /// Creates a context for the `expr`.
1730    ///
1731    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1732    /// expected metadata.
1733    pub(crate) fn new_opt(
1734        sst_meta: &RegionMetadataRef,
1735        expected_meta: Option<&RegionMetadata>,
1736        expr: &Expr,
1737    ) -> Option<Self> {
1738        let filter = SimpleFilterEvaluator::try_new(expr)?;
1739        let (column_metadata, maybe_filter) = match expected_meta {
1740            Some(meta) => {
1741                // Gets the column metadata from the expected metadata.
1742                let column = meta.column_by_name(filter.column_name())?;
1743                // Checks if the column is present in the SST metadata. We still uses the
1744                // column from the expected metadata.
1745                match sst_meta.column_by_id(column.column_id) {
1746                    Some(sst_column) => {
1747                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1748
1749                        (column, MaybeFilter::Filter(filter))
1750                    }
1751                    None => {
1752                        // If the column is not present in the SST metadata, we evaluate the filter
1753                        // against the default value of the column.
1754                        // If we can't evaluate the filter, we return None.
1755                        if pruned_by_default(&filter, column)? {
1756                            (column, MaybeFilter::Pruned)
1757                        } else {
1758                            (column, MaybeFilter::Matched)
1759                        }
1760                    }
1761                }
1762            }
1763            None => {
1764                let column = sst_meta.column_by_name(filter.column_name())?;
1765                (column, MaybeFilter::Filter(filter))
1766            }
1767        };
1768
1769        Some(Self {
1770            filter: maybe_filter,
1771            column_id: column_metadata.column_id,
1772            semantic_type: column_metadata.semantic_type,
1773            data_type: column_metadata.column_schema.data_type.clone(),
1774        })
1775    }
1776
1777    /// Returns the filter to evaluate.
1778    pub(crate) fn filter(&self) -> &MaybeFilter {
1779        &self.filter
1780    }
1781
1782    /// Returns the column id.
1783    pub(crate) fn column_id(&self) -> ColumnId {
1784        self.column_id
1785    }
1786
1787    /// Returns the semantic type of the column.
1788    pub(crate) fn semantic_type(&self) -> SemanticType {
1789        self.semantic_type
1790    }
1791
1792    /// Returns the data type of the column.
1793    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1794        &self.data_type
1795    }
1796}
1797
1798/// Prune a column by its default value.
1799/// Returns false if we can't create the default value or evaluate the filter.
1800fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1801    let value = column.column_schema.create_default().ok().flatten()?;
1802    let scalar_value = value
1803        .try_to_scalar_value(&column.column_schema.data_type)
1804        .ok()?;
1805    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1806    Some(!matches)
1807}
1808
1809/// Parquet batch reader to read our SST format.
1810pub struct ParquetReader {
1811    /// File range context.
1812    context: FileRangeContextRef,
1813    /// Row group selection to read.
1814    selection: RowGroupSelection,
1815    /// Reader of current row group.
1816    reader_state: ReaderState,
1817    /// Metrics for tracking row group fetch operations.
1818    fetch_metrics: ParquetFetchMetrics,
1819}
1820
1821#[async_trait]
1822impl BatchReader for ParquetReader {
1823    #[tracing::instrument(
1824        skip_all,
1825        fields(
1826            region_id = %self.context.reader_builder().file_handle.region_id(),
1827            file_id = %self.context.reader_builder().file_handle.file_id()
1828        )
1829    )]
1830    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1831        let ReaderState::Readable(reader) = &mut self.reader_state else {
1832            return Ok(None);
1833        };
1834
1835        // We don't collect the elapsed time if the reader returns an error.
1836        if let Some(batch) = reader.next_batch().await? {
1837            return Ok(Some(batch));
1838        }
1839
1840        // No more items in current row group, reads next row group.
1841        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1842            let parquet_reader = self
1843                .context
1844                .reader_builder()
1845                .build(
1846                    row_group_idx,
1847                    Some(row_selection),
1848                    Some(&self.fetch_metrics),
1849                )
1850                .await?;
1851
1852            // Resets the parquet reader.
1853            // Compute skip_fields for this row group
1854            let skip_fields = self.context.should_skip_fields(row_group_idx);
1855            reader.reset_source(
1856                Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1857                skip_fields,
1858            );
1859            if let Some(batch) = reader.next_batch().await? {
1860                return Ok(Some(batch));
1861            }
1862        }
1863
1864        // The reader is exhausted.
1865        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1866        Ok(None)
1867    }
1868}
1869
1870impl Drop for ParquetReader {
1871    fn drop(&mut self) {
1872        let metrics = self.reader_state.metrics();
1873        debug!(
1874            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1875            self.context.reader_builder().file_handle.region_id(),
1876            self.context.reader_builder().file_handle.file_id(),
1877            self.context.reader_builder().file_handle.time_range(),
1878            metrics.filter_metrics.rg_total
1879                - metrics.filter_metrics.rg_inverted_filtered
1880                - metrics.filter_metrics.rg_minmax_filtered
1881                - metrics.filter_metrics.rg_fulltext_filtered
1882                - metrics.filter_metrics.rg_bloom_filtered,
1883            metrics.filter_metrics.rg_total,
1884            metrics
1885        );
1886
1887        // Report metrics.
1888        READ_STAGE_ELAPSED
1889            .with_label_values(&["build_parquet_reader"])
1890            .observe(metrics.build_cost.as_secs_f64());
1891        READ_STAGE_ELAPSED
1892            .with_label_values(&["scan_row_groups"])
1893            .observe(metrics.scan_cost.as_secs_f64());
1894        metrics.observe_rows("parquet_reader");
1895        metrics.filter_metrics.observe();
1896    }
1897}
1898
1899impl ParquetReader {
1900    /// Creates a new reader.
1901    #[tracing::instrument(
1902        skip_all,
1903        fields(
1904            region_id = %context.reader_builder().file_handle.region_id(),
1905            file_id = %context.reader_builder().file_handle.file_id()
1906        )
1907    )]
1908    pub(crate) async fn new(
1909        context: FileRangeContextRef,
1910        mut selection: RowGroupSelection,
1911    ) -> Result<Self> {
1912        let fetch_metrics = ParquetFetchMetrics::default();
1913        // No more items in current row group, reads next row group.
1914        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1915            let parquet_reader = context
1916                .reader_builder()
1917                .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1918                .await?;
1919            // Compute skip_fields once for this row group
1920            let skip_fields = context.should_skip_fields(row_group_idx);
1921            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1922                context.clone(),
1923                RowGroupReader::new(context.clone(), parquet_reader),
1924                skip_fields,
1925            ))
1926        } else {
1927            ReaderState::Exhausted(ReaderMetrics::default())
1928        };
1929
1930        Ok(ParquetReader {
1931            context,
1932            selection,
1933            reader_state,
1934            fetch_metrics,
1935        })
1936    }
1937
1938    /// Returns the metadata of the SST.
1939    pub fn metadata(&self) -> &RegionMetadataRef {
1940        self.context.read_format().metadata()
1941    }
1942
1943    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1944        self.context.reader_builder().parquet_meta.clone()
1945    }
1946}
1947
1948/// RowGroupReaderContext represents the fields that cannot be shared
1949/// between different `RowGroupReader`s.
1950pub(crate) trait RowGroupReaderContext: Send {
1951    fn map_result(
1952        &self,
1953        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1954    ) -> Result<Option<RecordBatch>>;
1955
1956    fn read_format(&self) -> &ReadFormat;
1957}
1958
1959impl RowGroupReaderContext for FileRangeContextRef {
1960    fn map_result(
1961        &self,
1962        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1963    ) -> Result<Option<RecordBatch>> {
1964        result.context(ArrowReaderSnafu {
1965            path: self.file_path(),
1966        })
1967    }
1968
1969    fn read_format(&self) -> &ReadFormat {
1970        self.as_ref().read_format()
1971    }
1972}
1973
1974/// [RowGroupReader] that reads from [FileRange].
1975pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1976
1977impl RowGroupReader {
1978    /// Creates a new reader from file range.
1979    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1980        Self::create(context, reader)
1981    }
1982}
1983
1984/// Reader to read a row group of a parquet file.
1985pub(crate) struct RowGroupReaderBase<T> {
1986    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1987    context: T,
1988    /// Inner parquet reader.
1989    reader: ParquetRecordBatchReader,
1990    /// Buffered batches to return.
1991    batches: VecDeque<Batch>,
1992    /// Local scan metrics.
1993    metrics: ReaderMetrics,
1994    /// Cached sequence array to override sequences.
1995    override_sequence: Option<ArrayRef>,
1996}
1997
1998impl<T> RowGroupReaderBase<T>
1999where
2000    T: RowGroupReaderContext,
2001{
2002    /// Creates a new reader to read the primary key format.
2003    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
2004        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2005        let override_sequence = context
2006            .read_format()
2007            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2008        assert!(context.read_format().as_primary_key().is_some());
2009
2010        Self {
2011            context,
2012            reader,
2013            batches: VecDeque::new(),
2014            metrics: ReaderMetrics::default(),
2015            override_sequence,
2016        }
2017    }
2018
2019    /// Gets the metrics.
2020    pub(crate) fn metrics(&self) -> &ReaderMetrics {
2021        &self.metrics
2022    }
2023
2024    /// Gets [ReadFormat] of underlying reader.
2025    pub(crate) fn read_format(&self) -> &ReadFormat {
2026        self.context.read_format()
2027    }
2028
2029    /// Tries to fetch next [RecordBatch] from the reader.
2030    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2031        self.context.map_result(self.reader.next().transpose())
2032    }
2033
2034    /// Returns the next [Batch].
2035    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
2036        let scan_start = Instant::now();
2037        if let Some(batch) = self.batches.pop_front() {
2038            self.metrics.num_rows += batch.num_rows();
2039            self.metrics.scan_cost += scan_start.elapsed();
2040            return Ok(Some(batch));
2041        }
2042
2043        // We need to fetch next record batch and convert it to batches.
2044        while self.batches.is_empty() {
2045            let Some(record_batch) = self.fetch_next_record_batch()? else {
2046                self.metrics.scan_cost += scan_start.elapsed();
2047                return Ok(None);
2048            };
2049            self.metrics.num_record_batches += 1;
2050
2051            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
2052            self.context
2053                .read_format()
2054                .as_primary_key()
2055                .unwrap()
2056                .convert_record_batch(
2057                    &record_batch,
2058                    self.override_sequence.as_ref(),
2059                    &mut self.batches,
2060                )?;
2061            self.metrics.num_batches += self.batches.len();
2062        }
2063        let batch = self.batches.pop_front();
2064        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2065        self.metrics.scan_cost += scan_start.elapsed();
2066        Ok(batch)
2067    }
2068}
2069
2070#[async_trait::async_trait]
2071impl<T> BatchReader for RowGroupReaderBase<T>
2072where
2073    T: RowGroupReaderContext,
2074{
2075    async fn next_batch(&mut self) -> Result<Option<Batch>> {
2076        self.next_inner()
2077    }
2078}
2079
2080/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2081pub(crate) struct FlatRowGroupReader {
2082    /// Context for file ranges.
2083    context: FileRangeContextRef,
2084    /// Inner parquet reader.
2085    reader: ParquetRecordBatchReader,
2086    /// Cached sequence array to override sequences.
2087    override_sequence: Option<ArrayRef>,
2088}
2089
2090impl FlatRowGroupReader {
2091    /// Creates a new flat reader from file range.
2092    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
2093        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2094        let override_sequence = context
2095            .read_format()
2096            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2097
2098        Self {
2099            context,
2100            reader,
2101            override_sequence,
2102        }
2103    }
2104
2105    /// Returns the next RecordBatch.
2106    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2107        match self.reader.next() {
2108            Some(batch_result) => {
2109                let record_batch = batch_result.context(ArrowReaderSnafu {
2110                    path: self.context.file_path(),
2111                })?;
2112
2113                // Safety: Only flat format use FlatRowGroupReader.
2114                let flat_format = self.context.read_format().as_flat().unwrap();
2115                let record_batch =
2116                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2117                Ok(Some(record_batch))
2118            }
2119            None => Ok(None),
2120        }
2121    }
2122}