mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46    ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
59use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
60use crate::sst::parquet::metadata::MetadataLoader;
61use crate::sst::parquet::row_group::InMemoryRowGroup;
62use crate::sst::parquet::row_selection::RowGroupSelection;
63use crate::sst::parquet::stats::RowGroupPruningStats;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
65
66const INDEX_TYPE_FULLTEXT: &str = "fulltext";
67const INDEX_TYPE_INVERTED: &str = "inverted";
68const INDEX_TYPE_BLOOM: &str = "bloom filter";
69
70macro_rules! handle_index_error {
71    ($err:expr, $file_handle:expr, $index_type:expr) => {
72        if cfg!(any(test, feature = "test")) {
73            panic!(
74                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
75                $index_type,
76                $file_handle.region_id(),
77                $file_handle.file_id(),
78                $err
79            );
80        } else {
81            warn!(
82                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
83                $index_type,
84                $file_handle.region_id(),
85                $file_handle.file_id()
86            );
87        }
88    };
89}
90
91/// Parquet SST reader builder.
92pub struct ParquetReaderBuilder {
93    /// SST directory.
94    file_dir: String,
95    /// Path type for generating file paths.
96    path_type: PathType,
97    file_handle: FileHandle,
98    object_store: ObjectStore,
99    /// Predicate to push down.
100    predicate: Option<Predicate>,
101    /// Metadata of columns to read.
102    ///
103    /// `None` reads all columns. Due to schema change, the projection
104    /// can contain columns not in the parquet file.
105    projection: Option<Vec<ColumnId>>,
106    /// Strategy to cache SST data.
107    cache_strategy: CacheStrategy,
108    /// Index appliers.
109    inverted_index_applier: Option<InvertedIndexApplierRef>,
110    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
111    fulltext_index_applier: Option<FulltextIndexApplierRef>,
112    /// Expected metadata of the region while reading the SST.
113    /// This is usually the latest metadata of the region. The reader use
114    /// it get the correct column id of a column by name.
115    expected_metadata: Option<RegionMetadataRef>,
116    /// Whether to use flat format for reading.
117    flat_format: bool,
118    /// Whether this reader is for compaction.
119    compaction: bool,
120}
121
122impl ParquetReaderBuilder {
123    /// Returns a new [ParquetReaderBuilder] to read specific SST.
124    pub fn new(
125        file_dir: String,
126        path_type: PathType,
127        file_handle: FileHandle,
128        object_store: ObjectStore,
129    ) -> ParquetReaderBuilder {
130        ParquetReaderBuilder {
131            file_dir,
132            path_type,
133            file_handle,
134            object_store,
135            predicate: None,
136            projection: None,
137            cache_strategy: CacheStrategy::Disabled,
138            inverted_index_applier: None,
139            bloom_filter_index_applier: None,
140            fulltext_index_applier: None,
141            expected_metadata: None,
142            flat_format: false,
143            compaction: false,
144        }
145    }
146
147    /// Attaches the predicate to the builder.
148    #[must_use]
149    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
150        self.predicate = predicate;
151        self
152    }
153
154    /// Attaches the projection to the builder.
155    ///
156    /// The reader only applies the projection to fields.
157    #[must_use]
158    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
159        self.projection = projection;
160        self
161    }
162
163    /// Attaches the cache to the builder.
164    #[must_use]
165    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
166        self.cache_strategy = cache;
167        self
168    }
169
170    /// Attaches the inverted index applier to the builder.
171    #[must_use]
172    pub(crate) fn inverted_index_applier(
173        mut self,
174        index_applier: Option<InvertedIndexApplierRef>,
175    ) -> Self {
176        self.inverted_index_applier = index_applier;
177        self
178    }
179
180    /// Attaches the bloom filter index applier to the builder.
181    #[must_use]
182    pub(crate) fn bloom_filter_index_applier(
183        mut self,
184        index_applier: Option<BloomFilterIndexApplierRef>,
185    ) -> Self {
186        self.bloom_filter_index_applier = index_applier;
187        self
188    }
189
190    /// Attaches the fulltext index applier to the builder.
191    #[must_use]
192    pub(crate) fn fulltext_index_applier(
193        mut self,
194        index_applier: Option<FulltextIndexApplierRef>,
195    ) -> Self {
196        self.fulltext_index_applier = index_applier;
197        self
198    }
199
200    /// Attaches the expected metadata to the builder.
201    #[must_use]
202    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
203        self.expected_metadata = expected_metadata;
204        self
205    }
206
207    /// Sets the flat format flag.
208    #[must_use]
209    pub fn flat_format(mut self, flat_format: bool) -> Self {
210        self.flat_format = flat_format;
211        self
212    }
213
214    /// Sets the compaction flag.
215    #[must_use]
216    pub fn compaction(mut self, compaction: bool) -> Self {
217        self.compaction = compaction;
218        self
219    }
220
221    /// Builds a [ParquetReader].
222    ///
223    /// This needs to perform IO operation.
224    pub async fn build(&self) -> Result<ParquetReader> {
225        let mut metrics = ReaderMetrics::default();
226
227        let (context, selection) = self.build_reader_input(&mut metrics).await?;
228        ParquetReader::new(Arc::new(context), selection).await
229    }
230
231    /// Builds a [FileRangeContext] and collects row groups to read.
232    ///
233    /// This needs to perform IO operation.
234    pub(crate) async fn build_reader_input(
235        &self,
236        metrics: &mut ReaderMetrics,
237    ) -> Result<(FileRangeContext, RowGroupSelection)> {
238        let start = Instant::now();
239
240        let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
241        let file_size = self.file_handle.meta_ref().file_size;
242
243        // Loads parquet metadata of the file.
244        let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
245        // Decodes region metadata.
246        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
247        // Gets the metadata stored in the SST.
248        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
249        let mut read_format = if let Some(column_ids) = &self.projection {
250            ReadFormat::new(
251                region_meta.clone(),
252                Some(column_ids),
253                self.flat_format,
254                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
255                &file_path,
256                self.compaction,
257            )?
258        } else {
259            // Lists all column ids to read, we always use the expected metadata if possible.
260            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
261            let column_ids: Vec<_> = expected_meta
262                .column_metadatas
263                .iter()
264                .map(|col| col.column_id)
265                .collect();
266            ReadFormat::new(
267                region_meta.clone(),
268                Some(&column_ids),
269                self.flat_format,
270                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
271                &file_path,
272                self.compaction,
273            )?
274        };
275        if need_override_sequence(&parquet_meta) {
276            read_format
277                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
278        }
279
280        // Computes the projection mask.
281        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
282        let indices = read_format.projection_indices();
283        // Now we assumes we don't have nested schemas.
284        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
285        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
286
287        // Computes the field levels.
288        let hint = Some(read_format.arrow_schema().fields());
289        let field_levels =
290            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
291                .context(ReadDataPartSnafu)?;
292        let selection = self
293            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
294            .await;
295
296        let reader_builder = RowGroupReaderBuilder {
297            file_handle: self.file_handle.clone(),
298            file_path,
299            parquet_meta,
300            object_store: self.object_store.clone(),
301            projection: projection_mask,
302            field_levels,
303            cache_strategy: self.cache_strategy.clone(),
304        };
305
306        let filters = if let Some(predicate) = &self.predicate {
307            predicate
308                .exprs()
309                .iter()
310                .filter_map(|expr| {
311                    SimpleFilterContext::new_opt(
312                        &region_meta,
313                        self.expected_metadata.as_deref(),
314                        expr,
315                    )
316                })
317                .collect::<Vec<_>>()
318        } else {
319            vec![]
320        };
321
322        let codec = build_primary_key_codec(read_format.metadata());
323
324        let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
325
326        metrics.build_cost += start.elapsed();
327
328        Ok((context, selection))
329    }
330
331    /// Decodes region metadata from key value.
332    fn get_region_metadata(
333        file_path: &str,
334        key_value_meta: Option<&Vec<KeyValue>>,
335    ) -> Result<RegionMetadata> {
336        let key_values = key_value_meta.context(InvalidParquetSnafu {
337            file: file_path,
338            reason: "missing key value meta",
339        })?;
340        let meta_value = key_values
341            .iter()
342            .find(|kv| kv.key == PARQUET_METADATA_KEY)
343            .with_context(|| InvalidParquetSnafu {
344                file: file_path,
345                reason: format!("key {} not found", PARQUET_METADATA_KEY),
346            })?;
347        let json = meta_value
348            .value
349            .as_ref()
350            .with_context(|| InvalidParquetSnafu {
351                file: file_path,
352                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
353            })?;
354
355        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
356    }
357
358    /// Reads parquet metadata of specific file.
359    async fn read_parquet_metadata(
360        &self,
361        file_path: &str,
362        file_size: u64,
363    ) -> Result<Arc<ParquetMetaData>> {
364        let _t = READ_STAGE_ELAPSED
365            .with_label_values(&["read_parquet_metadata"])
366            .start_timer();
367
368        let file_id = self.file_handle.file_id();
369        // Tries to get from global cache.
370        if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
371            return Ok(metadata);
372        }
373
374        // Cache miss, load metadata directly.
375        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
376        let metadata = metadata_loader.load().await?;
377        let metadata = Arc::new(metadata);
378        // Cache the metadata.
379        self.cache_strategy
380            .put_parquet_meta_data(file_id, metadata.clone());
381
382        Ok(metadata)
383    }
384
385    /// Computes row groups to read, along with their respective row selections.
386    async fn row_groups_to_read(
387        &self,
388        read_format: &ReadFormat,
389        parquet_meta: &ParquetMetaData,
390        metrics: &mut ReaderFilterMetrics,
391    ) -> RowGroupSelection {
392        let num_row_groups = parquet_meta.num_row_groups();
393        let num_rows = parquet_meta.file_metadata().num_rows();
394        if num_row_groups == 0 || num_rows == 0 {
395            return RowGroupSelection::default();
396        }
397
398        // Let's assume that the number of rows in the first row group
399        // can represent the `row_group_size` of the Parquet file.
400        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
401        if row_group_size == 0 {
402            return RowGroupSelection::default();
403        }
404
405        metrics.rg_total += num_row_groups;
406        metrics.rows_total += num_rows as usize;
407
408        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
409
410        self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
411        if output.is_empty() {
412            return output;
413        }
414
415        let fulltext_filtered = self
416            .prune_row_groups_by_fulltext_index(
417                row_group_size,
418                num_row_groups,
419                &mut output,
420                metrics,
421            )
422            .await;
423        if output.is_empty() {
424            return output;
425        }
426
427        self.prune_row_groups_by_inverted_index(
428            row_group_size,
429            num_row_groups,
430            &mut output,
431            metrics,
432        )
433        .await;
434        if output.is_empty() {
435            return output;
436        }
437
438        self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
439            .await;
440        if output.is_empty() {
441            return output;
442        }
443
444        if !fulltext_filtered {
445            self.prune_row_groups_by_fulltext_bloom(
446                row_group_size,
447                parquet_meta,
448                &mut output,
449                metrics,
450            )
451            .await;
452        }
453        output
454    }
455
456    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
457    async fn prune_row_groups_by_fulltext_index(
458        &self,
459        row_group_size: usize,
460        num_row_groups: usize,
461        output: &mut RowGroupSelection,
462        metrics: &mut ReaderFilterMetrics,
463    ) -> bool {
464        let Some(index_applier) = &self.fulltext_index_applier else {
465            return false;
466        };
467        if !self.file_handle.meta_ref().fulltext_index_available() {
468            return false;
469        }
470
471        let predicate_key = index_applier.predicate_key();
472        // Fast path: return early if the result is in the cache.
473        let cached = self
474            .cache_strategy
475            .index_result_cache()
476            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
477        if let Some(result) = cached.as_ref()
478            && all_required_row_groups_searched(output, result)
479        {
480            apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
481            return true;
482        }
483
484        // Slow path: apply the index from the file.
485        let file_size_hint = self.file_handle.meta_ref().index_file_size();
486        let apply_res = index_applier
487            .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
488            .await;
489        let selection = match apply_res {
490            Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
491            Ok(None) => return false,
492            Err(err) => {
493                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
494                return false;
495            }
496        };
497
498        self.apply_index_result_and_update_cache(
499            predicate_key,
500            self.file_handle.file_id().file_id(),
501            selection,
502            output,
503            metrics,
504            INDEX_TYPE_FULLTEXT,
505        );
506        true
507    }
508
509    /// Applies index to prune row groups.
510    ///
511    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
512    /// as an escape route in case of index issues, and it can be used to test
513    /// the correctness of the index.
514    async fn prune_row_groups_by_inverted_index(
515        &self,
516        row_group_size: usize,
517        num_row_groups: usize,
518        output: &mut RowGroupSelection,
519        metrics: &mut ReaderFilterMetrics,
520    ) -> bool {
521        let Some(index_applier) = &self.inverted_index_applier else {
522            return false;
523        };
524        if !self.file_handle.meta_ref().inverted_index_available() {
525            return false;
526        }
527
528        let predicate_key = index_applier.predicate_key();
529        // Fast path: return early if the result is in the cache.
530        let cached = self
531            .cache_strategy
532            .index_result_cache()
533            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
534        if let Some(result) = cached.as_ref()
535            && all_required_row_groups_searched(output, result)
536        {
537            apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
538            return true;
539        }
540
541        // Slow path: apply the index from the file.
542        let file_size_hint = self.file_handle.meta_ref().index_file_size();
543        let apply_res = index_applier
544            .apply(self.file_handle.file_id(), Some(file_size_hint))
545            .await;
546        let selection = match apply_res {
547            Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
548                row_group_size,
549                num_row_groups,
550                output,
551            ),
552            Err(err) => {
553                handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
554                return false;
555            }
556        };
557
558        self.apply_index_result_and_update_cache(
559            predicate_key,
560            self.file_handle.file_id().file_id(),
561            selection,
562            output,
563            metrics,
564            INDEX_TYPE_INVERTED,
565        );
566        true
567    }
568
569    async fn prune_row_groups_by_bloom_filter(
570        &self,
571        row_group_size: usize,
572        parquet_meta: &ParquetMetaData,
573        output: &mut RowGroupSelection,
574        metrics: &mut ReaderFilterMetrics,
575    ) -> bool {
576        let Some(index_applier) = &self.bloom_filter_index_applier else {
577            return false;
578        };
579        if !self.file_handle.meta_ref().bloom_filter_index_available() {
580            return false;
581        }
582
583        let predicate_key = index_applier.predicate_key();
584        // Fast path: return early if the result is in the cache.
585        let cached = self
586            .cache_strategy
587            .index_result_cache()
588            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
589        if let Some(result) = cached.as_ref()
590            && all_required_row_groups_searched(output, result)
591        {
592            apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
593            return true;
594        }
595
596        // Slow path: apply the index from the file.
597        let file_size_hint = self.file_handle.meta_ref().index_file_size();
598        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
599            (
600                rg.num_rows() as usize,
601                // Optimize: only search the row group that required by `output` and not stored in `cached`.
602                output.contains_non_empty_row_group(i)
603                    && cached
604                        .as_ref()
605                        .map(|c| !c.contains_row_group(i))
606                        .unwrap_or(true),
607            )
608        });
609        let apply_res = index_applier
610            .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
611            .await;
612        let mut selection = match apply_res {
613            Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
614            Err(err) => {
615                handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
616                return false;
617            }
618        };
619
620        // New searched row groups are added to `selection`, concat them with `cached`.
621        if let Some(cached) = cached.as_ref() {
622            selection.concat(cached);
623        }
624
625        self.apply_index_result_and_update_cache(
626            predicate_key,
627            self.file_handle.file_id().file_id(),
628            selection,
629            output,
630            metrics,
631            INDEX_TYPE_BLOOM,
632        );
633        true
634    }
635
636    async fn prune_row_groups_by_fulltext_bloom(
637        &self,
638        row_group_size: usize,
639        parquet_meta: &ParquetMetaData,
640        output: &mut RowGroupSelection,
641        metrics: &mut ReaderFilterMetrics,
642    ) -> bool {
643        let Some(index_applier) = &self.fulltext_index_applier else {
644            return false;
645        };
646        if !self.file_handle.meta_ref().fulltext_index_available() {
647            return false;
648        }
649
650        let predicate_key = index_applier.predicate_key();
651        // Fast path: return early if the result is in the cache.
652        let cached = self
653            .cache_strategy
654            .index_result_cache()
655            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
656        if let Some(result) = cached.as_ref()
657            && all_required_row_groups_searched(output, result)
658        {
659            apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
660            return true;
661        }
662
663        // Slow path: apply the index from the file.
664        let file_size_hint = self.file_handle.meta_ref().index_file_size();
665        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
666            (
667                rg.num_rows() as usize,
668                // Optimize: only search the row group that required by `output` and not stored in `cached`.
669                output.contains_non_empty_row_group(i)
670                    && cached
671                        .as_ref()
672                        .map(|c| !c.contains_row_group(i))
673                        .unwrap_or(true),
674            )
675        });
676        let apply_res = index_applier
677            .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
678            .await;
679        let mut selection = match apply_res {
680            Ok(Some(apply_output)) => {
681                RowGroupSelection::from_row_ranges(apply_output, row_group_size)
682            }
683            Ok(None) => return false,
684            Err(err) => {
685                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
686                return false;
687            }
688        };
689
690        // New searched row groups are added to `selection`, concat them with `cached`.
691        if let Some(cached) = cached.as_ref() {
692            selection.concat(cached);
693        }
694
695        self.apply_index_result_and_update_cache(
696            predicate_key,
697            self.file_handle.file_id().file_id(),
698            selection,
699            output,
700            metrics,
701            INDEX_TYPE_FULLTEXT,
702        );
703        true
704    }
705
706    /// Prunes row groups by min-max index.
707    fn prune_row_groups_by_minmax(
708        &self,
709        read_format: &ReadFormat,
710        parquet_meta: &ParquetMetaData,
711        output: &mut RowGroupSelection,
712        metrics: &mut ReaderFilterMetrics,
713    ) -> bool {
714        let Some(predicate) = &self.predicate else {
715            return false;
716        };
717
718        let row_groups_before = output.row_group_count();
719
720        let region_meta = read_format.metadata();
721        let row_groups = parquet_meta.row_groups();
722        let stats =
723            RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
724        let prune_schema = self
725            .expected_metadata
726            .as_ref()
727            .map(|meta| meta.schema.arrow_schema())
728            .unwrap_or_else(|| region_meta.schema.arrow_schema());
729
730        // Here we use the schema of the SST to build the physical expression. If the column
731        // in the SST doesn't have the same column id as the column in the expected metadata,
732        // we will get a None statistics for that column.
733        predicate
734            .prune_with_stats(&stats, prune_schema)
735            .iter()
736            .zip(0..parquet_meta.num_row_groups())
737            .for_each(|(mask, row_group)| {
738                if !*mask {
739                    output.remove_row_group(row_group);
740                }
741            });
742
743        let row_groups_after = output.row_group_count();
744        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
745
746        true
747    }
748
749    fn apply_index_result_and_update_cache(
750        &self,
751        predicate_key: &PredicateKey,
752        file_id: FileId,
753        result: RowGroupSelection,
754        output: &mut RowGroupSelection,
755        metrics: &mut ReaderFilterMetrics,
756        index_type: &str,
757    ) {
758        apply_selection_and_update_metrics(output, &result, metrics, index_type);
759
760        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
761            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
762        }
763    }
764}
765
766fn apply_selection_and_update_metrics(
767    output: &mut RowGroupSelection,
768    result: &RowGroupSelection,
769    metrics: &mut ReaderFilterMetrics,
770    index_type: &str,
771) {
772    let intersection = output.intersect(result);
773
774    let row_group_count = output.row_group_count() - intersection.row_group_count();
775    let row_count = output.row_count() - intersection.row_count();
776
777    metrics.update_index_metrics(index_type, row_group_count, row_count);
778
779    *output = intersection;
780}
781
782fn all_required_row_groups_searched(
783    required_row_groups: &RowGroupSelection,
784    cached_row_groups: &RowGroupSelection,
785) -> bool {
786    required_row_groups.iter().all(|(rg_id, _)| {
787        // Row group with no rows is not required to search.
788        !required_row_groups.contains_non_empty_row_group(*rg_id)
789            // The row group is already searched.
790            || cached_row_groups.contains_row_group(*rg_id)
791    })
792}
793
794/// Metrics of filtering rows groups and rows.
795#[derive(Debug, Default, Clone, Copy)]
796pub(crate) struct ReaderFilterMetrics {
797    /// Number of row groups before filtering.
798    pub(crate) rg_total: usize,
799    /// Number of row groups filtered by fulltext index.
800    pub(crate) rg_fulltext_filtered: usize,
801    /// Number of row groups filtered by inverted index.
802    pub(crate) rg_inverted_filtered: usize,
803    /// Number of row groups filtered by min-max index.
804    pub(crate) rg_minmax_filtered: usize,
805    /// Number of row groups filtered by bloom filter index.
806    pub(crate) rg_bloom_filtered: usize,
807
808    /// Number of rows in row group before filtering.
809    pub(crate) rows_total: usize,
810    /// Number of rows in row group filtered by fulltext index.
811    pub(crate) rows_fulltext_filtered: usize,
812    /// Number of rows in row group filtered by inverted index.
813    pub(crate) rows_inverted_filtered: usize,
814    /// Number of rows in row group filtered by bloom filter index.
815    pub(crate) rows_bloom_filtered: usize,
816    /// Number of rows filtered by precise filter.
817    pub(crate) rows_precise_filtered: usize,
818}
819
820impl ReaderFilterMetrics {
821    /// Adds `other` metrics to this metrics.
822    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
823        self.rg_total += other.rg_total;
824        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
825        self.rg_inverted_filtered += other.rg_inverted_filtered;
826        self.rg_minmax_filtered += other.rg_minmax_filtered;
827        self.rg_bloom_filtered += other.rg_bloom_filtered;
828
829        self.rows_total += other.rows_total;
830        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
831        self.rows_inverted_filtered += other.rows_inverted_filtered;
832        self.rows_bloom_filtered += other.rows_bloom_filtered;
833        self.rows_precise_filtered += other.rows_precise_filtered;
834    }
835
836    /// Reports metrics.
837    pub(crate) fn observe(&self) {
838        READ_ROW_GROUPS_TOTAL
839            .with_label_values(&["before_filtering"])
840            .inc_by(self.rg_total as u64);
841        READ_ROW_GROUPS_TOTAL
842            .with_label_values(&["fulltext_index_filtered"])
843            .inc_by(self.rg_fulltext_filtered as u64);
844        READ_ROW_GROUPS_TOTAL
845            .with_label_values(&["inverted_index_filtered"])
846            .inc_by(self.rg_inverted_filtered as u64);
847        READ_ROW_GROUPS_TOTAL
848            .with_label_values(&["minmax_index_filtered"])
849            .inc_by(self.rg_minmax_filtered as u64);
850        READ_ROW_GROUPS_TOTAL
851            .with_label_values(&["bloom_filter_index_filtered"])
852            .inc_by(self.rg_bloom_filtered as u64);
853
854        PRECISE_FILTER_ROWS_TOTAL
855            .with_label_values(&["parquet"])
856            .inc_by(self.rows_precise_filtered as u64);
857        READ_ROWS_IN_ROW_GROUP_TOTAL
858            .with_label_values(&["before_filtering"])
859            .inc_by(self.rows_total as u64);
860        READ_ROWS_IN_ROW_GROUP_TOTAL
861            .with_label_values(&["fulltext_index_filtered"])
862            .inc_by(self.rows_fulltext_filtered as u64);
863        READ_ROWS_IN_ROW_GROUP_TOTAL
864            .with_label_values(&["inverted_index_filtered"])
865            .inc_by(self.rows_inverted_filtered as u64);
866        READ_ROWS_IN_ROW_GROUP_TOTAL
867            .with_label_values(&["bloom_filter_index_filtered"])
868            .inc_by(self.rows_bloom_filtered as u64);
869    }
870
871    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
872        match index_type {
873            INDEX_TYPE_FULLTEXT => {
874                self.rg_fulltext_filtered += row_group_count;
875                self.rows_fulltext_filtered += row_count;
876            }
877            INDEX_TYPE_INVERTED => {
878                self.rg_inverted_filtered += row_group_count;
879                self.rows_inverted_filtered += row_count;
880            }
881            INDEX_TYPE_BLOOM => {
882                self.rg_bloom_filtered += row_group_count;
883                self.rows_bloom_filtered += row_count;
884            }
885            _ => {}
886        }
887    }
888}
889
890/// Parquet reader metrics.
891#[derive(Debug, Default, Clone)]
892pub struct ReaderMetrics {
893    /// Filtered row groups and rows metrics.
894    pub(crate) filter_metrics: ReaderFilterMetrics,
895    /// Duration to build the parquet reader.
896    pub(crate) build_cost: Duration,
897    /// Duration to scan the reader.
898    pub(crate) scan_cost: Duration,
899    /// Number of record batches read.
900    pub(crate) num_record_batches: usize,
901    /// Number of batches decoded.
902    pub(crate) num_batches: usize,
903    /// Number of rows read.
904    pub(crate) num_rows: usize,
905}
906
907impl ReaderMetrics {
908    /// Adds `other` metrics to this metrics.
909    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
910        self.filter_metrics.merge_from(&other.filter_metrics);
911        self.build_cost += other.build_cost;
912        self.scan_cost += other.scan_cost;
913        self.num_record_batches += other.num_record_batches;
914        self.num_batches += other.num_batches;
915        self.num_rows += other.num_rows;
916    }
917
918    /// Reports total rows.
919    pub(crate) fn observe_rows(&self, read_type: &str) {
920        READ_ROWS_TOTAL
921            .with_label_values(&[read_type])
922            .inc_by(self.num_rows as u64);
923    }
924}
925
926/// Builder to build a [ParquetRecordBatchReader] for a row group.
927pub(crate) struct RowGroupReaderBuilder {
928    /// SST file to read.
929    ///
930    /// Holds the file handle to avoid the file purge it.
931    file_handle: FileHandle,
932    /// Path of the file.
933    file_path: String,
934    /// Metadata of the parquet file.
935    parquet_meta: Arc<ParquetMetaData>,
936    /// Object store as an Operator.
937    object_store: ObjectStore,
938    /// Projection mask.
939    projection: ProjectionMask,
940    /// Field levels to read.
941    field_levels: FieldLevels,
942    /// Cache.
943    cache_strategy: CacheStrategy,
944}
945
946impl RowGroupReaderBuilder {
947    /// Path of the file to read.
948    pub(crate) fn file_path(&self) -> &str {
949        &self.file_path
950    }
951
952    /// Handle of the file to read.
953    pub(crate) fn file_handle(&self) -> &FileHandle {
954        &self.file_handle
955    }
956
957    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
958        &self.parquet_meta
959    }
960
961    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
962        &self.cache_strategy
963    }
964
965    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
966    pub(crate) async fn build(
967        &self,
968        row_group_idx: usize,
969        row_selection: Option<RowSelection>,
970    ) -> Result<ParquetRecordBatchReader> {
971        let mut row_group = InMemoryRowGroup::create(
972            self.file_handle.region_id(),
973            self.file_handle.file_id().file_id(),
974            &self.parquet_meta,
975            row_group_idx,
976            self.cache_strategy.clone(),
977            &self.file_path,
978            self.object_store.clone(),
979        );
980        // Fetches data into memory.
981        row_group
982            .fetch(&self.projection, row_selection.as_ref())
983            .await
984            .context(ReadParquetSnafu {
985                path: &self.file_path,
986            })?;
987
988        // Builds the parquet reader.
989        // Now the row selection is None.
990        ParquetRecordBatchReader::try_new_with_row_groups(
991            &self.field_levels,
992            &row_group,
993            DEFAULT_READ_BATCH_SIZE,
994            row_selection,
995        )
996        .context(ReadParquetSnafu {
997            path: &self.file_path,
998        })
999    }
1000}
1001
1002/// The state of a [ParquetReader].
1003enum ReaderState {
1004    /// The reader is reading a row group.
1005    Readable(PruneReader),
1006    /// The reader is exhausted.
1007    Exhausted(ReaderMetrics),
1008}
1009
1010impl ReaderState {
1011    /// Returns the metrics of the reader.
1012    fn metrics(&self) -> ReaderMetrics {
1013        match self {
1014            ReaderState::Readable(reader) => reader.metrics(),
1015            ReaderState::Exhausted(m) => m.clone(),
1016        }
1017    }
1018}
1019
1020/// The filter to evaluate or the prune result of the default value.
1021pub(crate) enum MaybeFilter {
1022    /// The filter to evaluate.
1023    Filter(SimpleFilterEvaluator),
1024    /// The filter matches the default value.
1025    Matched,
1026    /// The filter is pruned.
1027    Pruned,
1028}
1029
1030/// Context to evaluate the column filter for a parquet file.
1031pub(crate) struct SimpleFilterContext {
1032    /// Filter to evaluate.
1033    filter: MaybeFilter,
1034    /// Id of the column to evaluate.
1035    column_id: ColumnId,
1036    /// Semantic type of the column.
1037    semantic_type: SemanticType,
1038    /// The data type of the column.
1039    data_type: ConcreteDataType,
1040}
1041
1042impl SimpleFilterContext {
1043    /// Creates a context for the `expr`.
1044    ///
1045    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1046    /// expected metadata.
1047    pub(crate) fn new_opt(
1048        sst_meta: &RegionMetadataRef,
1049        expected_meta: Option<&RegionMetadata>,
1050        expr: &Expr,
1051    ) -> Option<Self> {
1052        let filter = SimpleFilterEvaluator::try_new(expr)?;
1053        let (column_metadata, maybe_filter) = match expected_meta {
1054            Some(meta) => {
1055                // Gets the column metadata from the expected metadata.
1056                let column = meta.column_by_name(filter.column_name())?;
1057                // Checks if the column is present in the SST metadata. We still uses the
1058                // column from the expected metadata.
1059                match sst_meta.column_by_id(column.column_id) {
1060                    Some(sst_column) => {
1061                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1062
1063                        (column, MaybeFilter::Filter(filter))
1064                    }
1065                    None => {
1066                        // If the column is not present in the SST metadata, we evaluate the filter
1067                        // against the default value of the column.
1068                        // If we can't evaluate the filter, we return None.
1069                        if pruned_by_default(&filter, column)? {
1070                            (column, MaybeFilter::Pruned)
1071                        } else {
1072                            (column, MaybeFilter::Matched)
1073                        }
1074                    }
1075                }
1076            }
1077            None => {
1078                let column = sst_meta.column_by_name(filter.column_name())?;
1079                (column, MaybeFilter::Filter(filter))
1080            }
1081        };
1082
1083        Some(Self {
1084            filter: maybe_filter,
1085            column_id: column_metadata.column_id,
1086            semantic_type: column_metadata.semantic_type,
1087            data_type: column_metadata.column_schema.data_type.clone(),
1088        })
1089    }
1090
1091    /// Returns the filter to evaluate.
1092    pub(crate) fn filter(&self) -> &MaybeFilter {
1093        &self.filter
1094    }
1095
1096    /// Returns the column id.
1097    pub(crate) fn column_id(&self) -> ColumnId {
1098        self.column_id
1099    }
1100
1101    /// Returns the semantic type of the column.
1102    pub(crate) fn semantic_type(&self) -> SemanticType {
1103        self.semantic_type
1104    }
1105
1106    /// Returns the data type of the column.
1107    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1108        &self.data_type
1109    }
1110}
1111
1112/// Prune a column by its default value.
1113/// Returns false if we can't create the default value or evaluate the filter.
1114fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1115    let value = column.column_schema.create_default().ok().flatten()?;
1116    let scalar_value = value
1117        .try_to_scalar_value(&column.column_schema.data_type)
1118        .ok()?;
1119    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1120    Some(!matches)
1121}
1122
1123/// Parquet batch reader to read our SST format.
1124pub struct ParquetReader {
1125    /// File range context.
1126    context: FileRangeContextRef,
1127    /// Row group selection to read.
1128    selection: RowGroupSelection,
1129    /// Reader of current row group.
1130    reader_state: ReaderState,
1131}
1132
1133#[async_trait]
1134impl BatchReader for ParquetReader {
1135    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1136        let ReaderState::Readable(reader) = &mut self.reader_state else {
1137            return Ok(None);
1138        };
1139
1140        // We don't collect the elapsed time if the reader returns an error.
1141        if let Some(batch) = reader.next_batch().await? {
1142            return Ok(Some(batch));
1143        }
1144
1145        // No more items in current row group, reads next row group.
1146        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1147            let parquet_reader = self
1148                .context
1149                .reader_builder()
1150                .build(row_group_idx, Some(row_selection))
1151                .await?;
1152
1153            // Resets the parquet reader.
1154            reader.reset_source(Source::RowGroup(RowGroupReader::new(
1155                self.context.clone(),
1156                parquet_reader,
1157            )));
1158            if let Some(batch) = reader.next_batch().await? {
1159                return Ok(Some(batch));
1160            }
1161        }
1162
1163        // The reader is exhausted.
1164        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1165        Ok(None)
1166    }
1167}
1168
1169impl Drop for ParquetReader {
1170    fn drop(&mut self) {
1171        let metrics = self.reader_state.metrics();
1172        debug!(
1173            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1174            self.context.reader_builder().file_handle.region_id(),
1175            self.context.reader_builder().file_handle.file_id(),
1176            self.context.reader_builder().file_handle.time_range(),
1177            metrics.filter_metrics.rg_total
1178                - metrics.filter_metrics.rg_inverted_filtered
1179                - metrics.filter_metrics.rg_minmax_filtered
1180                - metrics.filter_metrics.rg_fulltext_filtered
1181                - metrics.filter_metrics.rg_bloom_filtered,
1182            metrics.filter_metrics.rg_total,
1183            metrics
1184        );
1185
1186        // Report metrics.
1187        READ_STAGE_ELAPSED
1188            .with_label_values(&["build_parquet_reader"])
1189            .observe(metrics.build_cost.as_secs_f64());
1190        READ_STAGE_ELAPSED
1191            .with_label_values(&["scan_row_groups"])
1192            .observe(metrics.scan_cost.as_secs_f64());
1193        metrics.observe_rows("parquet_reader");
1194        metrics.filter_metrics.observe();
1195    }
1196}
1197
1198impl ParquetReader {
1199    /// Creates a new reader.
1200    pub(crate) async fn new(
1201        context: FileRangeContextRef,
1202        mut selection: RowGroupSelection,
1203    ) -> Result<Self> {
1204        // No more items in current row group, reads next row group.
1205        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1206            let parquet_reader = context
1207                .reader_builder()
1208                .build(row_group_idx, Some(row_selection))
1209                .await?;
1210            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1211                context.clone(),
1212                RowGroupReader::new(context.clone(), parquet_reader),
1213            ))
1214        } else {
1215            ReaderState::Exhausted(ReaderMetrics::default())
1216        };
1217
1218        Ok(ParquetReader {
1219            context,
1220            selection,
1221            reader_state,
1222        })
1223    }
1224
1225    /// Returns the metadata of the SST.
1226    pub fn metadata(&self) -> &RegionMetadataRef {
1227        self.context.read_format().metadata()
1228    }
1229
1230    #[cfg(test)]
1231    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1232        self.context.reader_builder().parquet_meta.clone()
1233    }
1234}
1235
1236/// RowGroupReaderContext represents the fields that cannot be shared
1237/// between different `RowGroupReader`s.
1238pub(crate) trait RowGroupReaderContext: Send {
1239    fn map_result(
1240        &self,
1241        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1242    ) -> Result<Option<RecordBatch>>;
1243
1244    fn read_format(&self) -> &ReadFormat;
1245}
1246
1247impl RowGroupReaderContext for FileRangeContextRef {
1248    fn map_result(
1249        &self,
1250        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1251    ) -> Result<Option<RecordBatch>> {
1252        result.context(ArrowReaderSnafu {
1253            path: self.file_path(),
1254        })
1255    }
1256
1257    fn read_format(&self) -> &ReadFormat {
1258        self.as_ref().read_format()
1259    }
1260}
1261
1262/// [RowGroupReader] that reads from [FileRange].
1263pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1264
1265impl RowGroupReader {
1266    /// Creates a new reader from file range.
1267    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1268        Self::create(context, reader)
1269    }
1270}
1271
1272/// Reader to read a row group of a parquet file.
1273pub(crate) struct RowGroupReaderBase<T> {
1274    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1275    context: T,
1276    /// Inner parquet reader.
1277    reader: ParquetRecordBatchReader,
1278    /// Buffered batches to return.
1279    batches: VecDeque<Batch>,
1280    /// Local scan metrics.
1281    metrics: ReaderMetrics,
1282    /// Cached sequence array to override sequences.
1283    override_sequence: Option<ArrayRef>,
1284}
1285
1286impl<T> RowGroupReaderBase<T>
1287where
1288    T: RowGroupReaderContext,
1289{
1290    /// Creates a new reader to read the primary key format.
1291    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1292        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1293        let override_sequence = context
1294            .read_format()
1295            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1296        assert!(context.read_format().as_primary_key().is_some());
1297
1298        Self {
1299            context,
1300            reader,
1301            batches: VecDeque::new(),
1302            metrics: ReaderMetrics::default(),
1303            override_sequence,
1304        }
1305    }
1306
1307    /// Gets the metrics.
1308    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1309        &self.metrics
1310    }
1311
1312    /// Gets [ReadFormat] of underlying reader.
1313    pub(crate) fn read_format(&self) -> &ReadFormat {
1314        self.context.read_format()
1315    }
1316
1317    /// Tries to fetch next [RecordBatch] from the reader.
1318    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1319        self.context.map_result(self.reader.next().transpose())
1320    }
1321
1322    /// Returns the next [Batch].
1323    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1324        let scan_start = Instant::now();
1325        if let Some(batch) = self.batches.pop_front() {
1326            self.metrics.num_rows += batch.num_rows();
1327            self.metrics.scan_cost += scan_start.elapsed();
1328            return Ok(Some(batch));
1329        }
1330
1331        // We need to fetch next record batch and convert it to batches.
1332        while self.batches.is_empty() {
1333            let Some(record_batch) = self.fetch_next_record_batch()? else {
1334                self.metrics.scan_cost += scan_start.elapsed();
1335                return Ok(None);
1336            };
1337            self.metrics.num_record_batches += 1;
1338
1339            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
1340            self.context
1341                .read_format()
1342                .as_primary_key()
1343                .unwrap()
1344                .convert_record_batch(
1345                    &record_batch,
1346                    self.override_sequence.as_ref(),
1347                    &mut self.batches,
1348                )?;
1349            self.metrics.num_batches += self.batches.len();
1350        }
1351        let batch = self.batches.pop_front();
1352        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1353        self.metrics.scan_cost += scan_start.elapsed();
1354        Ok(batch)
1355    }
1356}
1357
1358#[async_trait::async_trait]
1359impl<T> BatchReader for RowGroupReaderBase<T>
1360where
1361    T: RowGroupReaderContext,
1362{
1363    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1364        self.next_inner()
1365    }
1366}
1367
1368/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
1369pub(crate) struct FlatRowGroupReader {
1370    /// Context for file ranges.
1371    context: FileRangeContextRef,
1372    /// Inner parquet reader.
1373    reader: ParquetRecordBatchReader,
1374    /// Cached sequence array to override sequences.
1375    override_sequence: Option<ArrayRef>,
1376}
1377
1378impl FlatRowGroupReader {
1379    /// Creates a new flat reader from file range.
1380    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1381        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1382        let override_sequence = context
1383            .read_format()
1384            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1385
1386        Self {
1387            context,
1388            reader,
1389            override_sequence,
1390        }
1391    }
1392
1393    /// Returns the next RecordBatch.
1394    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1395        match self.reader.next() {
1396            Some(batch_result) => {
1397                let record_batch = batch_result.context(ArrowReaderSnafu {
1398                    path: self.context.file_path(),
1399                })?;
1400                // Safety: Only flat format use FlatRowGroupReader.
1401                let flat_format = self.context.read_format().as_flat().unwrap();
1402                let record_batch =
1403                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1404                Ok(Some(record_batch))
1405            }
1406            None => Ok(None),
1407        }
1408    }
1409}