mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::ColumnId;
40use table::predicate::Predicate;
41
42use crate::cache::index::result_cache::PredicateKey;
43use crate::cache::CacheStrategy;
44use crate::error::{
45    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46    ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49    PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
50    READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::{FileHandle, FileId};
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
59use crate::sst::parquet::format::{need_override_sequence, ReadFormat};
60use crate::sst::parquet::metadata::MetadataLoader;
61use crate::sst::parquet::row_group::InMemoryRowGroup;
62use crate::sst::parquet::row_selection::RowGroupSelection;
63use crate::sst::parquet::stats::RowGroupPruningStats;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
65
66const INDEX_TYPE_FULLTEXT: &str = "fulltext";
67const INDEX_TYPE_INVERTED: &str = "inverted";
68const INDEX_TYPE_BLOOM: &str = "bloom filter";
69
70macro_rules! handle_index_error {
71    ($err:expr, $file_handle:expr, $index_type:expr) => {
72        if cfg!(any(test, feature = "test")) {
73            panic!(
74                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
75                $index_type,
76                $file_handle.region_id(),
77                $file_handle.file_id(),
78                $err
79            );
80        } else {
81            warn!(
82                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
83                $index_type,
84                $file_handle.region_id(),
85                $file_handle.file_id()
86            );
87        }
88    };
89}
90
91/// Parquet SST reader builder.
92pub struct ParquetReaderBuilder {
93    /// SST directory.
94    file_dir: String,
95    /// Path type for generating file paths.
96    path_type: PathType,
97    file_handle: FileHandle,
98    object_store: ObjectStore,
99    /// Predicate to push down.
100    predicate: Option<Predicate>,
101    /// Metadata of columns to read.
102    ///
103    /// `None` reads all columns. Due to schema change, the projection
104    /// can contain columns not in the parquet file.
105    projection: Option<Vec<ColumnId>>,
106    /// Strategy to cache SST data.
107    cache_strategy: CacheStrategy,
108    /// Index appliers.
109    inverted_index_applier: Option<InvertedIndexApplierRef>,
110    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
111    fulltext_index_applier: Option<FulltextIndexApplierRef>,
112    /// Expected metadata of the region while reading the SST.
113    /// This is usually the latest metadata of the region. The reader use
114    /// it get the correct column id of a column by name.
115    expected_metadata: Option<RegionMetadataRef>,
116    /// Whether to use flat format for reading.
117    flat_format: bool,
118}
119
120impl ParquetReaderBuilder {
121    /// Returns a new [ParquetReaderBuilder] to read specific SST.
122    pub fn new(
123        file_dir: String,
124        path_type: PathType,
125        file_handle: FileHandle,
126        object_store: ObjectStore,
127    ) -> ParquetReaderBuilder {
128        ParquetReaderBuilder {
129            file_dir,
130            path_type,
131            file_handle,
132            object_store,
133            predicate: None,
134            projection: None,
135            cache_strategy: CacheStrategy::Disabled,
136            inverted_index_applier: None,
137            bloom_filter_index_applier: None,
138            fulltext_index_applier: None,
139            expected_metadata: None,
140            flat_format: false,
141        }
142    }
143
144    /// Attaches the predicate to the builder.
145    #[must_use]
146    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
147        self.predicate = predicate;
148        self
149    }
150
151    /// Attaches the projection to the builder.
152    ///
153    /// The reader only applies the projection to fields.
154    #[must_use]
155    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
156        self.projection = projection;
157        self
158    }
159
160    /// Attaches the cache to the builder.
161    #[must_use]
162    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
163        self.cache_strategy = cache;
164        self
165    }
166
167    /// Attaches the inverted index applier to the builder.
168    #[must_use]
169    pub(crate) fn inverted_index_applier(
170        mut self,
171        index_applier: Option<InvertedIndexApplierRef>,
172    ) -> Self {
173        self.inverted_index_applier = index_applier;
174        self
175    }
176
177    /// Attaches the bloom filter index applier to the builder.
178    #[must_use]
179    pub(crate) fn bloom_filter_index_applier(
180        mut self,
181        index_applier: Option<BloomFilterIndexApplierRef>,
182    ) -> Self {
183        self.bloom_filter_index_applier = index_applier;
184        self
185    }
186
187    /// Attaches the fulltext index applier to the builder.
188    #[must_use]
189    pub(crate) fn fulltext_index_applier(
190        mut self,
191        index_applier: Option<FulltextIndexApplierRef>,
192    ) -> Self {
193        self.fulltext_index_applier = index_applier;
194        self
195    }
196
197    /// Attaches the expected metadata to the builder.
198    #[must_use]
199    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
200        self.expected_metadata = expected_metadata;
201        self
202    }
203
204    /// Sets the flat format flag.
205    #[must_use]
206    pub fn flat_format(mut self, flat_format: bool) -> Self {
207        self.flat_format = flat_format;
208        self
209    }
210
211    /// Builds a [ParquetReader].
212    ///
213    /// This needs to perform IO operation.
214    pub async fn build(&self) -> Result<ParquetReader> {
215        let mut metrics = ReaderMetrics::default();
216
217        let (context, selection) = self.build_reader_input(&mut metrics).await?;
218        ParquetReader::new(Arc::new(context), selection).await
219    }
220
221    /// Builds a [FileRangeContext] and collects row groups to read.
222    ///
223    /// This needs to perform IO operation.
224    pub(crate) async fn build_reader_input(
225        &self,
226        metrics: &mut ReaderMetrics,
227    ) -> Result<(FileRangeContext, RowGroupSelection)> {
228        let start = Instant::now();
229
230        let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
231        let file_size = self.file_handle.meta_ref().file_size;
232
233        // Loads parquet metadata of the file.
234        let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
235        // Decodes region metadata.
236        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
237        // Gets the metadata stored in the SST.
238        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
239        let mut read_format = if let Some(column_ids) = &self.projection {
240            ReadFormat::new(
241                region_meta.clone(),
242                column_ids.iter().copied(),
243                self.flat_format,
244            )
245        } else {
246            // Lists all column ids to read, we always use the expected metadata if possible.
247            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
248            ReadFormat::new(
249                region_meta.clone(),
250                expected_meta
251                    .column_metadatas
252                    .iter()
253                    .map(|col| col.column_id),
254                self.flat_format,
255            )
256        };
257        if need_override_sequence(&parquet_meta) {
258            read_format
259                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
260        }
261
262        // Computes the projection mask.
263        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
264        let indices = read_format.projection_indices();
265        // Now we assumes we don't have nested schemas.
266        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
267        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
268
269        // Computes the field levels.
270        let hint = Some(read_format.arrow_schema().fields());
271        let field_levels =
272            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
273                .context(ReadDataPartSnafu)?;
274        let selection = self
275            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
276            .await;
277
278        let reader_builder = RowGroupReaderBuilder {
279            file_handle: self.file_handle.clone(),
280            file_path,
281            parquet_meta,
282            object_store: self.object_store.clone(),
283            projection: projection_mask,
284            field_levels,
285            cache_strategy: self.cache_strategy.clone(),
286        };
287
288        let filters = if let Some(predicate) = &self.predicate {
289            predicate
290                .exprs()
291                .iter()
292                .filter_map(|expr| {
293                    SimpleFilterContext::new_opt(
294                        &region_meta,
295                        self.expected_metadata.as_deref(),
296                        expr,
297                    )
298                })
299                .collect::<Vec<_>>()
300        } else {
301            vec![]
302        };
303
304        let codec = build_primary_key_codec(read_format.metadata());
305
306        let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
307
308        metrics.build_cost += start.elapsed();
309
310        Ok((context, selection))
311    }
312
313    /// Decodes region metadata from key value.
314    fn get_region_metadata(
315        file_path: &str,
316        key_value_meta: Option<&Vec<KeyValue>>,
317    ) -> Result<RegionMetadata> {
318        let key_values = key_value_meta.context(InvalidParquetSnafu {
319            file: file_path,
320            reason: "missing key value meta",
321        })?;
322        let meta_value = key_values
323            .iter()
324            .find(|kv| kv.key == PARQUET_METADATA_KEY)
325            .with_context(|| InvalidParquetSnafu {
326                file: file_path,
327                reason: format!("key {} not found", PARQUET_METADATA_KEY),
328            })?;
329        let json = meta_value
330            .value
331            .as_ref()
332            .with_context(|| InvalidParquetSnafu {
333                file: file_path,
334                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
335            })?;
336
337        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
338    }
339
340    /// Reads parquet metadata of specific file.
341    async fn read_parquet_metadata(
342        &self,
343        file_path: &str,
344        file_size: u64,
345    ) -> Result<Arc<ParquetMetaData>> {
346        let _t = READ_STAGE_ELAPSED
347            .with_label_values(&["read_parquet_metadata"])
348            .start_timer();
349
350        let file_id = self.file_handle.file_id();
351        // Tries to get from global cache.
352        if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
353            return Ok(metadata);
354        }
355
356        // Cache miss, load metadata directly.
357        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
358        let metadata = metadata_loader.load().await?;
359        let metadata = Arc::new(metadata);
360        // Cache the metadata.
361        self.cache_strategy
362            .put_parquet_meta_data(file_id, metadata.clone());
363
364        Ok(metadata)
365    }
366
367    /// Computes row groups to read, along with their respective row selections.
368    async fn row_groups_to_read(
369        &self,
370        read_format: &ReadFormat,
371        parquet_meta: &ParquetMetaData,
372        metrics: &mut ReaderFilterMetrics,
373    ) -> RowGroupSelection {
374        let num_row_groups = parquet_meta.num_row_groups();
375        let num_rows = parquet_meta.file_metadata().num_rows();
376        if num_row_groups == 0 || num_rows == 0 {
377            return RowGroupSelection::default();
378        }
379
380        // Let's assume that the number of rows in the first row group
381        // can represent the `row_group_size` of the Parquet file.
382        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
383        if row_group_size == 0 {
384            return RowGroupSelection::default();
385        }
386
387        metrics.rg_total += num_row_groups;
388        metrics.rows_total += num_rows as usize;
389
390        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
391
392        self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
393        if output.is_empty() {
394            return output;
395        }
396
397        let fulltext_filtered = self
398            .prune_row_groups_by_fulltext_index(
399                row_group_size,
400                num_row_groups,
401                &mut output,
402                metrics,
403            )
404            .await;
405        if output.is_empty() {
406            return output;
407        }
408
409        self.prune_row_groups_by_inverted_index(
410            row_group_size,
411            num_row_groups,
412            &mut output,
413            metrics,
414        )
415        .await;
416        if output.is_empty() {
417            return output;
418        }
419
420        self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
421            .await;
422        if output.is_empty() {
423            return output;
424        }
425
426        if !fulltext_filtered {
427            self.prune_row_groups_by_fulltext_bloom(
428                row_group_size,
429                parquet_meta,
430                &mut output,
431                metrics,
432            )
433            .await;
434        }
435        output
436    }
437
438    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
439    async fn prune_row_groups_by_fulltext_index(
440        &self,
441        row_group_size: usize,
442        num_row_groups: usize,
443        output: &mut RowGroupSelection,
444        metrics: &mut ReaderFilterMetrics,
445    ) -> bool {
446        let Some(index_applier) = &self.fulltext_index_applier else {
447            return false;
448        };
449        if !self.file_handle.meta_ref().fulltext_index_available() {
450            return false;
451        }
452
453        let predicate_key = index_applier.predicate_key();
454        // Fast path: return early if the result is in the cache.
455        let cached = self
456            .cache_strategy
457            .index_result_cache()
458            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
459        if let Some(result) = cached.as_ref() {
460            if all_required_row_groups_searched(output, result) {
461                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
462                return true;
463            }
464        }
465
466        // Slow path: apply the index from the file.
467        let file_size_hint = self.file_handle.meta_ref().index_file_size();
468        let apply_res = index_applier
469            .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
470            .await;
471        let selection = match apply_res {
472            Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
473            Ok(None) => return false,
474            Err(err) => {
475                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
476                return false;
477            }
478        };
479
480        self.apply_index_result_and_update_cache(
481            predicate_key,
482            self.file_handle.file_id().file_id(),
483            selection,
484            output,
485            metrics,
486            INDEX_TYPE_FULLTEXT,
487        );
488        true
489    }
490
491    /// Applies index to prune row groups.
492    ///
493    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
494    /// as an escape route in case of index issues, and it can be used to test
495    /// the correctness of the index.
496    async fn prune_row_groups_by_inverted_index(
497        &self,
498        row_group_size: usize,
499        num_row_groups: usize,
500        output: &mut RowGroupSelection,
501        metrics: &mut ReaderFilterMetrics,
502    ) -> bool {
503        let Some(index_applier) = &self.inverted_index_applier else {
504            return false;
505        };
506        if !self.file_handle.meta_ref().inverted_index_available() {
507            return false;
508        }
509
510        let predicate_key = index_applier.predicate_key();
511        // Fast path: return early if the result is in the cache.
512        let cached = self
513            .cache_strategy
514            .index_result_cache()
515            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
516        if let Some(result) = cached.as_ref() {
517            if all_required_row_groups_searched(output, result) {
518                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
519                return true;
520            }
521        }
522
523        // Slow path: apply the index from the file.
524        let file_size_hint = self.file_handle.meta_ref().index_file_size();
525        let apply_res = index_applier
526            .apply(self.file_handle.file_id(), Some(file_size_hint))
527            .await;
528        let selection = match apply_res {
529            Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
530                row_group_size,
531                num_row_groups,
532                output,
533            ),
534            Err(err) => {
535                handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
536                return false;
537            }
538        };
539
540        self.apply_index_result_and_update_cache(
541            predicate_key,
542            self.file_handle.file_id().file_id(),
543            selection,
544            output,
545            metrics,
546            INDEX_TYPE_INVERTED,
547        );
548        true
549    }
550
551    async fn prune_row_groups_by_bloom_filter(
552        &self,
553        row_group_size: usize,
554        parquet_meta: &ParquetMetaData,
555        output: &mut RowGroupSelection,
556        metrics: &mut ReaderFilterMetrics,
557    ) -> bool {
558        let Some(index_applier) = &self.bloom_filter_index_applier else {
559            return false;
560        };
561        if !self.file_handle.meta_ref().bloom_filter_index_available() {
562            return false;
563        }
564
565        let predicate_key = index_applier.predicate_key();
566        // Fast path: return early if the result is in the cache.
567        let cached = self
568            .cache_strategy
569            .index_result_cache()
570            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
571        if let Some(result) = cached.as_ref() {
572            if all_required_row_groups_searched(output, result) {
573                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
574                return true;
575            }
576        }
577
578        // Slow path: apply the index from the file.
579        let file_size_hint = self.file_handle.meta_ref().index_file_size();
580        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
581            (
582                rg.num_rows() as usize,
583                // Optimize: only search the row group that required by `output` and not stored in `cached`.
584                output.contains_non_empty_row_group(i)
585                    && cached
586                        .as_ref()
587                        .map(|c| !c.contains_row_group(i))
588                        .unwrap_or(true),
589            )
590        });
591        let apply_res = index_applier
592            .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
593            .await;
594        let mut selection = match apply_res {
595            Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
596            Err(err) => {
597                handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
598                return false;
599            }
600        };
601
602        // New searched row groups are added to `selection`, concat them with `cached`.
603        if let Some(cached) = cached.as_ref() {
604            selection.concat(cached);
605        }
606
607        self.apply_index_result_and_update_cache(
608            predicate_key,
609            self.file_handle.file_id().file_id(),
610            selection,
611            output,
612            metrics,
613            INDEX_TYPE_BLOOM,
614        );
615        true
616    }
617
618    async fn prune_row_groups_by_fulltext_bloom(
619        &self,
620        row_group_size: usize,
621        parquet_meta: &ParquetMetaData,
622        output: &mut RowGroupSelection,
623        metrics: &mut ReaderFilterMetrics,
624    ) -> bool {
625        let Some(index_applier) = &self.fulltext_index_applier else {
626            return false;
627        };
628        if !self.file_handle.meta_ref().fulltext_index_available() {
629            return false;
630        }
631
632        let predicate_key = index_applier.predicate_key();
633        // Fast path: return early if the result is in the cache.
634        let cached = self
635            .cache_strategy
636            .index_result_cache()
637            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
638        if let Some(result) = cached.as_ref() {
639            if all_required_row_groups_searched(output, result) {
640                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
641                return true;
642            }
643        }
644
645        // Slow path: apply the index from the file.
646        let file_size_hint = self.file_handle.meta_ref().index_file_size();
647        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
648            (
649                rg.num_rows() as usize,
650                // Optimize: only search the row group that required by `output` and not stored in `cached`.
651                output.contains_non_empty_row_group(i)
652                    && cached
653                        .as_ref()
654                        .map(|c| !c.contains_row_group(i))
655                        .unwrap_or(true),
656            )
657        });
658        let apply_res = index_applier
659            .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
660            .await;
661        let mut selection = match apply_res {
662            Ok(Some(apply_output)) => {
663                RowGroupSelection::from_row_ranges(apply_output, row_group_size)
664            }
665            Ok(None) => return false,
666            Err(err) => {
667                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
668                return false;
669            }
670        };
671
672        // New searched row groups are added to `selection`, concat them with `cached`.
673        if let Some(cached) = cached.as_ref() {
674            selection.concat(cached);
675        }
676
677        self.apply_index_result_and_update_cache(
678            predicate_key,
679            self.file_handle.file_id().file_id(),
680            selection,
681            output,
682            metrics,
683            INDEX_TYPE_FULLTEXT,
684        );
685        true
686    }
687
688    /// Prunes row groups by min-max index.
689    fn prune_row_groups_by_minmax(
690        &self,
691        read_format: &ReadFormat,
692        parquet_meta: &ParquetMetaData,
693        output: &mut RowGroupSelection,
694        metrics: &mut ReaderFilterMetrics,
695    ) -> bool {
696        let Some(predicate) = &self.predicate else {
697            return false;
698        };
699
700        let row_groups_before = output.row_group_count();
701
702        let region_meta = read_format.metadata();
703        let row_groups = parquet_meta.row_groups();
704        let stats =
705            RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
706        let prune_schema = self
707            .expected_metadata
708            .as_ref()
709            .map(|meta| meta.schema.arrow_schema())
710            .unwrap_or_else(|| region_meta.schema.arrow_schema());
711
712        // Here we use the schema of the SST to build the physical expression. If the column
713        // in the SST doesn't have the same column id as the column in the expected metadata,
714        // we will get a None statistics for that column.
715        predicate
716            .prune_with_stats(&stats, prune_schema)
717            .iter()
718            .zip(0..parquet_meta.num_row_groups())
719            .for_each(|(mask, row_group)| {
720                if !*mask {
721                    output.remove_row_group(row_group);
722                }
723            });
724
725        let row_groups_after = output.row_group_count();
726        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
727
728        true
729    }
730
731    fn apply_index_result_and_update_cache(
732        &self,
733        predicate_key: &PredicateKey,
734        file_id: FileId,
735        result: RowGroupSelection,
736        output: &mut RowGroupSelection,
737        metrics: &mut ReaderFilterMetrics,
738        index_type: &str,
739    ) {
740        apply_selection_and_update_metrics(output, &result, metrics, index_type);
741
742        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
743            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
744        }
745    }
746}
747
748fn apply_selection_and_update_metrics(
749    output: &mut RowGroupSelection,
750    result: &RowGroupSelection,
751    metrics: &mut ReaderFilterMetrics,
752    index_type: &str,
753) {
754    let intersection = output.intersect(result);
755
756    let row_group_count = output.row_group_count() - intersection.row_group_count();
757    let row_count = output.row_count() - intersection.row_count();
758
759    metrics.update_index_metrics(index_type, row_group_count, row_count);
760
761    *output = intersection;
762}
763
764fn all_required_row_groups_searched(
765    required_row_groups: &RowGroupSelection,
766    cached_row_groups: &RowGroupSelection,
767) -> bool {
768    required_row_groups.iter().all(|(rg_id, _)| {
769        // Row group with no rows is not required to search.
770        !required_row_groups.contains_non_empty_row_group(*rg_id)
771            // The row group is already searched.
772            || cached_row_groups.contains_row_group(*rg_id)
773    })
774}
775
776/// Metrics of filtering rows groups and rows.
777#[derive(Debug, Default, Clone, Copy)]
778pub(crate) struct ReaderFilterMetrics {
779    /// Number of row groups before filtering.
780    pub(crate) rg_total: usize,
781    /// Number of row groups filtered by fulltext index.
782    pub(crate) rg_fulltext_filtered: usize,
783    /// Number of row groups filtered by inverted index.
784    pub(crate) rg_inverted_filtered: usize,
785    /// Number of row groups filtered by min-max index.
786    pub(crate) rg_minmax_filtered: usize,
787    /// Number of row groups filtered by bloom filter index.
788    pub(crate) rg_bloom_filtered: usize,
789
790    /// Number of rows in row group before filtering.
791    pub(crate) rows_total: usize,
792    /// Number of rows in row group filtered by fulltext index.
793    pub(crate) rows_fulltext_filtered: usize,
794    /// Number of rows in row group filtered by inverted index.
795    pub(crate) rows_inverted_filtered: usize,
796    /// Number of rows in row group filtered by bloom filter index.
797    pub(crate) rows_bloom_filtered: usize,
798    /// Number of rows filtered by precise filter.
799    pub(crate) rows_precise_filtered: usize,
800}
801
802impl ReaderFilterMetrics {
803    /// Adds `other` metrics to this metrics.
804    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
805        self.rg_total += other.rg_total;
806        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
807        self.rg_inverted_filtered += other.rg_inverted_filtered;
808        self.rg_minmax_filtered += other.rg_minmax_filtered;
809        self.rg_bloom_filtered += other.rg_bloom_filtered;
810
811        self.rows_total += other.rows_total;
812        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
813        self.rows_inverted_filtered += other.rows_inverted_filtered;
814        self.rows_bloom_filtered += other.rows_bloom_filtered;
815        self.rows_precise_filtered += other.rows_precise_filtered;
816    }
817
818    /// Reports metrics.
819    pub(crate) fn observe(&self) {
820        READ_ROW_GROUPS_TOTAL
821            .with_label_values(&["before_filtering"])
822            .inc_by(self.rg_total as u64);
823        READ_ROW_GROUPS_TOTAL
824            .with_label_values(&["fulltext_index_filtered"])
825            .inc_by(self.rg_fulltext_filtered as u64);
826        READ_ROW_GROUPS_TOTAL
827            .with_label_values(&["inverted_index_filtered"])
828            .inc_by(self.rg_inverted_filtered as u64);
829        READ_ROW_GROUPS_TOTAL
830            .with_label_values(&["minmax_index_filtered"])
831            .inc_by(self.rg_minmax_filtered as u64);
832        READ_ROW_GROUPS_TOTAL
833            .with_label_values(&["bloom_filter_index_filtered"])
834            .inc_by(self.rg_bloom_filtered as u64);
835
836        PRECISE_FILTER_ROWS_TOTAL
837            .with_label_values(&["parquet"])
838            .inc_by(self.rows_precise_filtered as u64);
839        READ_ROWS_IN_ROW_GROUP_TOTAL
840            .with_label_values(&["before_filtering"])
841            .inc_by(self.rows_total as u64);
842        READ_ROWS_IN_ROW_GROUP_TOTAL
843            .with_label_values(&["fulltext_index_filtered"])
844            .inc_by(self.rows_fulltext_filtered as u64);
845        READ_ROWS_IN_ROW_GROUP_TOTAL
846            .with_label_values(&["inverted_index_filtered"])
847            .inc_by(self.rows_inverted_filtered as u64);
848        READ_ROWS_IN_ROW_GROUP_TOTAL
849            .with_label_values(&["bloom_filter_index_filtered"])
850            .inc_by(self.rows_bloom_filtered as u64);
851    }
852
853    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
854        match index_type {
855            INDEX_TYPE_FULLTEXT => {
856                self.rg_fulltext_filtered += row_group_count;
857                self.rows_fulltext_filtered += row_count;
858            }
859            INDEX_TYPE_INVERTED => {
860                self.rg_inverted_filtered += row_group_count;
861                self.rows_inverted_filtered += row_count;
862            }
863            INDEX_TYPE_BLOOM => {
864                self.rg_bloom_filtered += row_group_count;
865                self.rows_bloom_filtered += row_count;
866            }
867            _ => {}
868        }
869    }
870}
871
872/// Parquet reader metrics.
873#[derive(Debug, Default, Clone)]
874pub struct ReaderMetrics {
875    /// Filtered row groups and rows metrics.
876    pub(crate) filter_metrics: ReaderFilterMetrics,
877    /// Duration to build the parquet reader.
878    pub(crate) build_cost: Duration,
879    /// Duration to scan the reader.
880    pub(crate) scan_cost: Duration,
881    /// Number of record batches read.
882    pub(crate) num_record_batches: usize,
883    /// Number of batches decoded.
884    pub(crate) num_batches: usize,
885    /// Number of rows read.
886    pub(crate) num_rows: usize,
887}
888
889impl ReaderMetrics {
890    /// Adds `other` metrics to this metrics.
891    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
892        self.filter_metrics.merge_from(&other.filter_metrics);
893        self.build_cost += other.build_cost;
894        self.scan_cost += other.scan_cost;
895        self.num_record_batches += other.num_record_batches;
896        self.num_batches += other.num_batches;
897        self.num_rows += other.num_rows;
898    }
899
900    /// Reports total rows.
901    pub(crate) fn observe_rows(&self, read_type: &str) {
902        READ_ROWS_TOTAL
903            .with_label_values(&[read_type])
904            .inc_by(self.num_rows as u64);
905    }
906}
907
908/// Builder to build a [ParquetRecordBatchReader] for a row group.
909pub(crate) struct RowGroupReaderBuilder {
910    /// SST file to read.
911    ///
912    /// Holds the file handle to avoid the file purge it.
913    file_handle: FileHandle,
914    /// Path of the file.
915    file_path: String,
916    /// Metadata of the parquet file.
917    parquet_meta: Arc<ParquetMetaData>,
918    /// Object store as an Operator.
919    object_store: ObjectStore,
920    /// Projection mask.
921    projection: ProjectionMask,
922    /// Field levels to read.
923    field_levels: FieldLevels,
924    /// Cache.
925    cache_strategy: CacheStrategy,
926}
927
928impl RowGroupReaderBuilder {
929    /// Path of the file to read.
930    pub(crate) fn file_path(&self) -> &str {
931        &self.file_path
932    }
933
934    /// Handle of the file to read.
935    pub(crate) fn file_handle(&self) -> &FileHandle {
936        &self.file_handle
937    }
938
939    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
940        &self.parquet_meta
941    }
942
943    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
944        &self.cache_strategy
945    }
946
947    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
948    pub(crate) async fn build(
949        &self,
950        row_group_idx: usize,
951        row_selection: Option<RowSelection>,
952    ) -> Result<ParquetRecordBatchReader> {
953        let mut row_group = InMemoryRowGroup::create(
954            self.file_handle.region_id(),
955            self.file_handle.file_id().file_id(),
956            &self.parquet_meta,
957            row_group_idx,
958            self.cache_strategy.clone(),
959            &self.file_path,
960            self.object_store.clone(),
961        );
962        // Fetches data into memory.
963        row_group
964            .fetch(&self.projection, row_selection.as_ref())
965            .await
966            .context(ReadParquetSnafu {
967                path: &self.file_path,
968            })?;
969
970        // Builds the parquet reader.
971        // Now the row selection is None.
972        ParquetRecordBatchReader::try_new_with_row_groups(
973            &self.field_levels,
974            &row_group,
975            DEFAULT_READ_BATCH_SIZE,
976            row_selection,
977        )
978        .context(ReadParquetSnafu {
979            path: &self.file_path,
980        })
981    }
982}
983
984/// The state of a [ParquetReader].
985enum ReaderState {
986    /// The reader is reading a row group.
987    Readable(PruneReader),
988    /// The reader is exhausted.
989    Exhausted(ReaderMetrics),
990}
991
992impl ReaderState {
993    /// Returns the metrics of the reader.
994    fn metrics(&self) -> ReaderMetrics {
995        match self {
996            ReaderState::Readable(reader) => reader.metrics(),
997            ReaderState::Exhausted(m) => m.clone(),
998        }
999    }
1000}
1001
1002/// The filter to evaluate or the prune result of the default value.
1003pub(crate) enum MaybeFilter {
1004    /// The filter to evaluate.
1005    Filter(SimpleFilterEvaluator),
1006    /// The filter matches the default value.
1007    Matched,
1008    /// The filter is pruned.
1009    Pruned,
1010}
1011
1012/// Context to evaluate the column filter for a parquet file.
1013pub(crate) struct SimpleFilterContext {
1014    /// Filter to evaluate.
1015    filter: MaybeFilter,
1016    /// Id of the column to evaluate.
1017    column_id: ColumnId,
1018    /// Semantic type of the column.
1019    semantic_type: SemanticType,
1020    /// The data type of the column.
1021    data_type: ConcreteDataType,
1022}
1023
1024impl SimpleFilterContext {
1025    /// Creates a context for the `expr`.
1026    ///
1027    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1028    /// expected metadata.
1029    pub(crate) fn new_opt(
1030        sst_meta: &RegionMetadataRef,
1031        expected_meta: Option<&RegionMetadata>,
1032        expr: &Expr,
1033    ) -> Option<Self> {
1034        let filter = SimpleFilterEvaluator::try_new(expr)?;
1035        let (column_metadata, maybe_filter) = match expected_meta {
1036            Some(meta) => {
1037                // Gets the column metadata from the expected metadata.
1038                let column = meta.column_by_name(filter.column_name())?;
1039                // Checks if the column is present in the SST metadata. We still uses the
1040                // column from the expected metadata.
1041                match sst_meta.column_by_id(column.column_id) {
1042                    Some(sst_column) => {
1043                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1044
1045                        (column, MaybeFilter::Filter(filter))
1046                    }
1047                    None => {
1048                        // If the column is not present in the SST metadata, we evaluate the filter
1049                        // against the default value of the column.
1050                        // If we can't evaluate the filter, we return None.
1051                        if pruned_by_default(&filter, column)? {
1052                            (column, MaybeFilter::Pruned)
1053                        } else {
1054                            (column, MaybeFilter::Matched)
1055                        }
1056                    }
1057                }
1058            }
1059            None => {
1060                let column = sst_meta.column_by_name(filter.column_name())?;
1061                (column, MaybeFilter::Filter(filter))
1062            }
1063        };
1064
1065        Some(Self {
1066            filter: maybe_filter,
1067            column_id: column_metadata.column_id,
1068            semantic_type: column_metadata.semantic_type,
1069            data_type: column_metadata.column_schema.data_type.clone(),
1070        })
1071    }
1072
1073    /// Returns the filter to evaluate.
1074    pub(crate) fn filter(&self) -> &MaybeFilter {
1075        &self.filter
1076    }
1077
1078    /// Returns the column id.
1079    pub(crate) fn column_id(&self) -> ColumnId {
1080        self.column_id
1081    }
1082
1083    /// Returns the semantic type of the column.
1084    pub(crate) fn semantic_type(&self) -> SemanticType {
1085        self.semantic_type
1086    }
1087
1088    /// Returns the data type of the column.
1089    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1090        &self.data_type
1091    }
1092}
1093
1094/// Prune a column by its default value.
1095/// Returns false if we can't create the default value or evaluate the filter.
1096fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1097    let value = column.column_schema.create_default().ok().flatten()?;
1098    let scalar_value = value
1099        .try_to_scalar_value(&column.column_schema.data_type)
1100        .ok()?;
1101    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1102    Some(!matches)
1103}
1104
1105/// Parquet batch reader to read our SST format.
1106pub struct ParquetReader {
1107    /// File range context.
1108    context: FileRangeContextRef,
1109    /// Row group selection to read.
1110    selection: RowGroupSelection,
1111    /// Reader of current row group.
1112    reader_state: ReaderState,
1113}
1114
1115#[async_trait]
1116impl BatchReader for ParquetReader {
1117    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1118        let ReaderState::Readable(reader) = &mut self.reader_state else {
1119            return Ok(None);
1120        };
1121
1122        // We don't collect the elapsed time if the reader returns an error.
1123        if let Some(batch) = reader.next_batch().await? {
1124            return Ok(Some(batch));
1125        }
1126
1127        // No more items in current row group, reads next row group.
1128        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1129            let parquet_reader = self
1130                .context
1131                .reader_builder()
1132                .build(row_group_idx, Some(row_selection))
1133                .await?;
1134
1135            // Resets the parquet reader.
1136            reader.reset_source(Source::RowGroup(RowGroupReader::new(
1137                self.context.clone(),
1138                parquet_reader,
1139            )));
1140            if let Some(batch) = reader.next_batch().await? {
1141                return Ok(Some(batch));
1142            }
1143        }
1144
1145        // The reader is exhausted.
1146        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1147        Ok(None)
1148    }
1149}
1150
1151impl Drop for ParquetReader {
1152    fn drop(&mut self) {
1153        let metrics = self.reader_state.metrics();
1154        debug!(
1155            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1156            self.context.reader_builder().file_handle.region_id(),
1157            self.context.reader_builder().file_handle.file_id(),
1158            self.context.reader_builder().file_handle.time_range(),
1159            metrics.filter_metrics.rg_total
1160                - metrics.filter_metrics.rg_inverted_filtered
1161                - metrics.filter_metrics.rg_minmax_filtered
1162                - metrics.filter_metrics.rg_fulltext_filtered
1163                - metrics.filter_metrics.rg_bloom_filtered,
1164            metrics.filter_metrics.rg_total,
1165            metrics
1166        );
1167
1168        // Report metrics.
1169        READ_STAGE_ELAPSED
1170            .with_label_values(&["build_parquet_reader"])
1171            .observe(metrics.build_cost.as_secs_f64());
1172        READ_STAGE_ELAPSED
1173            .with_label_values(&["scan_row_groups"])
1174            .observe(metrics.scan_cost.as_secs_f64());
1175        metrics.observe_rows("parquet_reader");
1176        metrics.filter_metrics.observe();
1177    }
1178}
1179
1180impl ParquetReader {
1181    /// Creates a new reader.
1182    pub(crate) async fn new(
1183        context: FileRangeContextRef,
1184        mut selection: RowGroupSelection,
1185    ) -> Result<Self> {
1186        // No more items in current row group, reads next row group.
1187        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1188            let parquet_reader = context
1189                .reader_builder()
1190                .build(row_group_idx, Some(row_selection))
1191                .await?;
1192            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1193                context.clone(),
1194                RowGroupReader::new(context.clone(), parquet_reader),
1195            ))
1196        } else {
1197            ReaderState::Exhausted(ReaderMetrics::default())
1198        };
1199
1200        Ok(ParquetReader {
1201            context,
1202            selection,
1203            reader_state,
1204        })
1205    }
1206
1207    /// Returns the metadata of the SST.
1208    pub fn metadata(&self) -> &RegionMetadataRef {
1209        self.context.read_format().metadata()
1210    }
1211
1212    #[cfg(test)]
1213    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1214        self.context.reader_builder().parquet_meta.clone()
1215    }
1216}
1217
1218/// RowGroupReaderContext represents the fields that cannot be shared
1219/// between different `RowGroupReader`s.
1220pub(crate) trait RowGroupReaderContext: Send {
1221    fn map_result(
1222        &self,
1223        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1224    ) -> Result<Option<RecordBatch>>;
1225
1226    fn read_format(&self) -> &ReadFormat;
1227}
1228
1229impl RowGroupReaderContext for FileRangeContextRef {
1230    fn map_result(
1231        &self,
1232        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1233    ) -> Result<Option<RecordBatch>> {
1234        result.context(ArrowReaderSnafu {
1235            path: self.file_path(),
1236        })
1237    }
1238
1239    fn read_format(&self) -> &ReadFormat {
1240        self.as_ref().read_format()
1241    }
1242}
1243
1244/// [RowGroupReader] that reads from [FileRange].
1245pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1246
1247impl RowGroupReader {
1248    /// Creates a new reader from file range.
1249    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1250        Self::create(context, reader)
1251    }
1252}
1253
1254/// Reader to read a row group of a parquet file.
1255pub(crate) struct RowGroupReaderBase<T> {
1256    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1257    context: T,
1258    /// Inner parquet reader.
1259    reader: ParquetRecordBatchReader,
1260    /// Buffered batches to return.
1261    batches: VecDeque<Batch>,
1262    /// Local scan metrics.
1263    metrics: ReaderMetrics,
1264    /// Cached sequence array to override sequences.
1265    override_sequence: Option<ArrayRef>,
1266}
1267
1268impl<T> RowGroupReaderBase<T>
1269where
1270    T: RowGroupReaderContext,
1271{
1272    /// Creates a new reader to read the primary key format.
1273    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1274        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1275        let override_sequence = context
1276            .read_format()
1277            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1278        assert!(context.read_format().as_primary_key().is_some());
1279
1280        Self {
1281            context,
1282            reader,
1283            batches: VecDeque::new(),
1284            metrics: ReaderMetrics::default(),
1285            override_sequence,
1286        }
1287    }
1288
1289    /// Gets the metrics.
1290    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1291        &self.metrics
1292    }
1293
1294    /// Gets [ReadFormat] of underlying reader.
1295    pub(crate) fn read_format(&self) -> &ReadFormat {
1296        self.context.read_format()
1297    }
1298
1299    /// Tries to fetch next [RecordBatch] from the reader.
1300    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1301        self.context.map_result(self.reader.next().transpose())
1302    }
1303
1304    /// Returns the next [Batch].
1305    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1306        let scan_start = Instant::now();
1307        if let Some(batch) = self.batches.pop_front() {
1308            self.metrics.num_rows += batch.num_rows();
1309            self.metrics.scan_cost += scan_start.elapsed();
1310            return Ok(Some(batch));
1311        }
1312
1313        // We need to fetch next record batch and convert it to batches.
1314        while self.batches.is_empty() {
1315            let Some(record_batch) = self.fetch_next_record_batch()? else {
1316                self.metrics.scan_cost += scan_start.elapsed();
1317                return Ok(None);
1318            };
1319            self.metrics.num_record_batches += 1;
1320
1321            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
1322            self.context
1323                .read_format()
1324                .as_primary_key()
1325                .unwrap()
1326                .convert_record_batch(
1327                    &record_batch,
1328                    self.override_sequence.as_ref(),
1329                    &mut self.batches,
1330                )?;
1331            self.metrics.num_batches += self.batches.len();
1332        }
1333        let batch = self.batches.pop_front();
1334        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1335        self.metrics.scan_cost += scan_start.elapsed();
1336        Ok(batch)
1337    }
1338}
1339
1340#[async_trait::async_trait]
1341impl<T> BatchReader for RowGroupReaderBase<T>
1342where
1343    T: RowGroupReaderContext,
1344{
1345    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1346        self.next_inner()
1347    }
1348}
1349
1350/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
1351pub(crate) struct FlatRowGroupReader {
1352    /// Context for file ranges.
1353    context: FileRangeContextRef,
1354    /// Inner parquet reader.
1355    reader: ParquetRecordBatchReader,
1356    /// Cached sequence array to override sequences.
1357    override_sequence: Option<ArrayRef>,
1358}
1359
1360impl FlatRowGroupReader {
1361    /// Creates a new flat reader from file range.
1362    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1363        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1364        let override_sequence = context
1365            .read_format()
1366            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1367
1368        Self {
1369            context,
1370            reader,
1371            override_sequence,
1372        }
1373    }
1374
1375    /// Returns the next RecordBatch.
1376    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1377        match self.reader.next() {
1378            Some(batch_result) => {
1379                let record_batch = batch_result.context(ArrowReaderSnafu {
1380                    path: self.context.file_path(),
1381                })?;
1382
1383                // Apply override sequence if needed
1384                if let (Some(flat_format), Some(override_array)) = (
1385                    self.context.read_format().as_flat(),
1386                    &self.override_sequence,
1387                ) {
1388                    let converted =
1389                        flat_format.convert_batch(record_batch, Some(override_array))?;
1390                    return Ok(Some(converted));
1391                }
1392
1393                Ok(Some(record_batch))
1394            }
1395            None => Ok(None),
1396        }
1397    }
1398}