mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46    ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{
59    FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
60};
61use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
62use crate::sst::parquet::metadata::MetadataLoader;
63use crate::sst::parquet::row_group::InMemoryRowGroup;
64use crate::sst::parquet::row_selection::RowGroupSelection;
65use crate::sst::parquet::stats::RowGroupPruningStats;
66use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
67
68const INDEX_TYPE_FULLTEXT: &str = "fulltext";
69const INDEX_TYPE_INVERTED: &str = "inverted";
70const INDEX_TYPE_BLOOM: &str = "bloom filter";
71
72macro_rules! handle_index_error {
73    ($err:expr, $file_handle:expr, $index_type:expr) => {
74        if cfg!(any(test, feature = "test")) {
75            panic!(
76                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
77                $index_type,
78                $file_handle.region_id(),
79                $file_handle.file_id(),
80                $err
81            );
82        } else {
83            warn!(
84                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
85                $index_type,
86                $file_handle.region_id(),
87                $file_handle.file_id()
88            );
89        }
90    };
91}
92
93/// Parquet SST reader builder.
94pub struct ParquetReaderBuilder {
95    /// SST directory.
96    table_dir: String,
97    /// Path type for generating file paths.
98    path_type: PathType,
99    file_handle: FileHandle,
100    object_store: ObjectStore,
101    /// Predicate to push down.
102    predicate: Option<Predicate>,
103    /// Metadata of columns to read.
104    ///
105    /// `None` reads all columns. Due to schema change, the projection
106    /// can contain columns not in the parquet file.
107    projection: Option<Vec<ColumnId>>,
108    /// Strategy to cache SST data.
109    cache_strategy: CacheStrategy,
110    /// Index appliers.
111    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
112    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
113    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
114    /// Expected metadata of the region while reading the SST.
115    /// This is usually the latest metadata of the region. The reader use
116    /// it get the correct column id of a column by name.
117    expected_metadata: Option<RegionMetadataRef>,
118    /// Whether to use flat format for reading.
119    flat_format: bool,
120    /// Whether this reader is for compaction.
121    compaction: bool,
122    /// Mode to pre-filter columns.
123    pre_filter_mode: PreFilterMode,
124}
125
126impl ParquetReaderBuilder {
127    /// Returns a new [ParquetReaderBuilder] to read specific SST.
128    pub fn new(
129        table_dir: String,
130        path_type: PathType,
131        file_handle: FileHandle,
132        object_store: ObjectStore,
133    ) -> ParquetReaderBuilder {
134        ParquetReaderBuilder {
135            table_dir,
136            path_type,
137            file_handle,
138            object_store,
139            predicate: None,
140            projection: None,
141            cache_strategy: CacheStrategy::Disabled,
142            inverted_index_appliers: [None, None],
143            bloom_filter_index_appliers: [None, None],
144            fulltext_index_appliers: [None, None],
145            expected_metadata: None,
146            flat_format: false,
147            compaction: false,
148            pre_filter_mode: PreFilterMode::All,
149        }
150    }
151
152    /// Attaches the predicate to the builder.
153    #[must_use]
154    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
155        self.predicate = predicate;
156        self
157    }
158
159    /// Attaches the projection to the builder.
160    ///
161    /// The reader only applies the projection to fields.
162    #[must_use]
163    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
164        self.projection = projection;
165        self
166    }
167
168    /// Attaches the cache to the builder.
169    #[must_use]
170    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
171        self.cache_strategy = cache;
172        self
173    }
174
175    /// Attaches the inverted index appliers to the builder.
176    #[must_use]
177    pub(crate) fn inverted_index_appliers(
178        mut self,
179        index_appliers: [Option<InvertedIndexApplierRef>; 2],
180    ) -> Self {
181        self.inverted_index_appliers = index_appliers;
182        self
183    }
184
185    /// Attaches the bloom filter index appliers to the builder.
186    #[must_use]
187    pub(crate) fn bloom_filter_index_appliers(
188        mut self,
189        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
190    ) -> Self {
191        self.bloom_filter_index_appliers = index_appliers;
192        self
193    }
194
195    /// Attaches the fulltext index appliers to the builder.
196    #[must_use]
197    pub(crate) fn fulltext_index_appliers(
198        mut self,
199        index_appliers: [Option<FulltextIndexApplierRef>; 2],
200    ) -> Self {
201        self.fulltext_index_appliers = index_appliers;
202        self
203    }
204
205    /// Attaches the expected metadata to the builder.
206    #[must_use]
207    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
208        self.expected_metadata = expected_metadata;
209        self
210    }
211
212    /// Sets the flat format flag.
213    #[must_use]
214    pub fn flat_format(mut self, flat_format: bool) -> Self {
215        self.flat_format = flat_format;
216        self
217    }
218
219    /// Sets the compaction flag.
220    #[must_use]
221    pub fn compaction(mut self, compaction: bool) -> Self {
222        self.compaction = compaction;
223        self
224    }
225
226    /// Sets the pre-filter mode.
227    #[must_use]
228    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
229        self.pre_filter_mode = pre_filter_mode;
230        self
231    }
232
233    /// Builds a [ParquetReader].
234    ///
235    /// This needs to perform IO operation.
236    pub async fn build(&self) -> Result<ParquetReader> {
237        let mut metrics = ReaderMetrics::default();
238
239        let (context, selection) = self.build_reader_input(&mut metrics).await?;
240        ParquetReader::new(Arc::new(context), selection).await
241    }
242
243    /// Builds a [FileRangeContext] and collects row groups to read.
244    ///
245    /// This needs to perform IO operation.
246    pub(crate) async fn build_reader_input(
247        &self,
248        metrics: &mut ReaderMetrics,
249    ) -> Result<(FileRangeContext, RowGroupSelection)> {
250        let start = Instant::now();
251
252        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
253        let file_size = self.file_handle.meta_ref().file_size;
254
255        // Loads parquet metadata of the file.
256        let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
257        // Decodes region metadata.
258        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
259        // Gets the metadata stored in the SST.
260        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
261        let mut read_format = if let Some(column_ids) = &self.projection {
262            ReadFormat::new(
263                region_meta.clone(),
264                Some(column_ids),
265                self.flat_format,
266                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
267                &file_path,
268                self.compaction,
269            )?
270        } else {
271            // Lists all column ids to read, we always use the expected metadata if possible.
272            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
273            let column_ids: Vec<_> = expected_meta
274                .column_metadatas
275                .iter()
276                .map(|col| col.column_id)
277                .collect();
278            ReadFormat::new(
279                region_meta.clone(),
280                Some(&column_ids),
281                self.flat_format,
282                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
283                &file_path,
284                self.compaction,
285            )?
286        };
287        if need_override_sequence(&parquet_meta) {
288            read_format
289                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
290        }
291
292        // Computes the projection mask.
293        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
294        let indices = read_format.projection_indices();
295        // Now we assumes we don't have nested schemas.
296        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
297        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
298
299        // Computes the field levels.
300        let hint = Some(read_format.arrow_schema().fields());
301        let field_levels =
302            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
303                .context(ReadDataPartSnafu)?;
304        let selection = self
305            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
306            .await;
307
308        let reader_builder = RowGroupReaderBuilder {
309            file_handle: self.file_handle.clone(),
310            file_path,
311            parquet_meta,
312            object_store: self.object_store.clone(),
313            projection: projection_mask,
314            field_levels,
315            cache_strategy: self.cache_strategy.clone(),
316        };
317
318        let filters = if let Some(predicate) = &self.predicate {
319            predicate
320                .exprs()
321                .iter()
322                .filter_map(|expr| {
323                    SimpleFilterContext::new_opt(
324                        &region_meta,
325                        self.expected_metadata.as_deref(),
326                        expr,
327                    )
328                })
329                .collect::<Vec<_>>()
330        } else {
331            vec![]
332        };
333
334        let codec = build_primary_key_codec(read_format.metadata());
335
336        let context = FileRangeContext::new(
337            reader_builder,
338            filters,
339            read_format,
340            codec,
341            self.pre_filter_mode,
342        );
343
344        metrics.build_cost += start.elapsed();
345
346        Ok((context, selection))
347    }
348
349    /// Decodes region metadata from key value.
350    fn get_region_metadata(
351        file_path: &str,
352        key_value_meta: Option<&Vec<KeyValue>>,
353    ) -> Result<RegionMetadata> {
354        let key_values = key_value_meta.context(InvalidParquetSnafu {
355            file: file_path,
356            reason: "missing key value meta",
357        })?;
358        let meta_value = key_values
359            .iter()
360            .find(|kv| kv.key == PARQUET_METADATA_KEY)
361            .with_context(|| InvalidParquetSnafu {
362                file: file_path,
363                reason: format!("key {} not found", PARQUET_METADATA_KEY),
364            })?;
365        let json = meta_value
366            .value
367            .as_ref()
368            .with_context(|| InvalidParquetSnafu {
369                file: file_path,
370                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
371            })?;
372
373        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
374    }
375
376    /// Reads parquet metadata of specific file.
377    async fn read_parquet_metadata(
378        &self,
379        file_path: &str,
380        file_size: u64,
381    ) -> Result<Arc<ParquetMetaData>> {
382        let _t = READ_STAGE_ELAPSED
383            .with_label_values(&["read_parquet_metadata"])
384            .start_timer();
385
386        let file_id = self.file_handle.file_id();
387        // Tries to get from global cache.
388        if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
389            return Ok(metadata);
390        }
391
392        // Cache miss, load metadata directly.
393        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
394        let metadata = metadata_loader.load().await?;
395        let metadata = Arc::new(metadata);
396        // Cache the metadata.
397        self.cache_strategy
398            .put_parquet_meta_data(file_id, metadata.clone());
399
400        Ok(metadata)
401    }
402
403    /// Computes row groups to read, along with their respective row selections.
404    async fn row_groups_to_read(
405        &self,
406        read_format: &ReadFormat,
407        parquet_meta: &ParquetMetaData,
408        metrics: &mut ReaderFilterMetrics,
409    ) -> RowGroupSelection {
410        let num_row_groups = parquet_meta.num_row_groups();
411        let num_rows = parquet_meta.file_metadata().num_rows();
412        if num_row_groups == 0 || num_rows == 0 {
413            return RowGroupSelection::default();
414        }
415
416        // Let's assume that the number of rows in the first row group
417        // can represent the `row_group_size` of the Parquet file.
418        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
419        if row_group_size == 0 {
420            return RowGroupSelection::default();
421        }
422
423        metrics.rg_total += num_row_groups;
424        metrics.rows_total += num_rows as usize;
425
426        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
427
428        // Compute skip_fields once for all pruning operations
429        let skip_fields = self.compute_skip_fields(parquet_meta);
430
431        self.prune_row_groups_by_minmax(
432            read_format,
433            parquet_meta,
434            &mut output,
435            metrics,
436            skip_fields,
437        );
438        if output.is_empty() {
439            return output;
440        }
441
442        let fulltext_filtered = self
443            .prune_row_groups_by_fulltext_index(
444                row_group_size,
445                num_row_groups,
446                &mut output,
447                metrics,
448                skip_fields,
449            )
450            .await;
451        if output.is_empty() {
452            return output;
453        }
454
455        self.prune_row_groups_by_inverted_index(
456            row_group_size,
457            num_row_groups,
458            &mut output,
459            metrics,
460            skip_fields,
461        )
462        .await;
463        if output.is_empty() {
464            return output;
465        }
466
467        self.prune_row_groups_by_bloom_filter(
468            row_group_size,
469            parquet_meta,
470            &mut output,
471            metrics,
472            skip_fields,
473        )
474        .await;
475        if output.is_empty() {
476            return output;
477        }
478
479        if !fulltext_filtered {
480            self.prune_row_groups_by_fulltext_bloom(
481                row_group_size,
482                parquet_meta,
483                &mut output,
484                metrics,
485                skip_fields,
486            )
487            .await;
488        }
489        output
490    }
491
492    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
493    async fn prune_row_groups_by_fulltext_index(
494        &self,
495        row_group_size: usize,
496        num_row_groups: usize,
497        output: &mut RowGroupSelection,
498        metrics: &mut ReaderFilterMetrics,
499        skip_fields: bool,
500    ) -> bool {
501        if !self.file_handle.meta_ref().fulltext_index_available() {
502            return false;
503        }
504
505        let mut pruned = false;
506        // If skip_fields is true, only apply the first applier (for tags).
507        let appliers = if skip_fields {
508            &self.fulltext_index_appliers[..1]
509        } else {
510            &self.fulltext_index_appliers[..]
511        };
512        for index_applier in appliers.iter().flatten() {
513            let predicate_key = index_applier.predicate_key();
514            // Fast path: return early if the result is in the cache.
515            let cached = self
516                .cache_strategy
517                .index_result_cache()
518                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
519            if let Some(result) = cached.as_ref()
520                && all_required_row_groups_searched(output, result)
521            {
522                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
523                pruned = true;
524                continue;
525            }
526
527            // Slow path: apply the index from the file.
528            let file_size_hint = self.file_handle.meta_ref().index_file_size();
529            let apply_res = index_applier
530                .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
531                .await;
532            let selection = match apply_res {
533                Ok(Some(res)) => {
534                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
535                }
536                Ok(None) => continue,
537                Err(err) => {
538                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
539                    continue;
540                }
541            };
542
543            self.apply_index_result_and_update_cache(
544                predicate_key,
545                self.file_handle.file_id().file_id(),
546                selection,
547                output,
548                metrics,
549                INDEX_TYPE_FULLTEXT,
550            );
551            pruned = true;
552        }
553        pruned
554    }
555
556    /// Applies index to prune row groups.
557    ///
558    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
559    /// as an escape route in case of index issues, and it can be used to test
560    /// the correctness of the index.
561    async fn prune_row_groups_by_inverted_index(
562        &self,
563        row_group_size: usize,
564        num_row_groups: usize,
565        output: &mut RowGroupSelection,
566        metrics: &mut ReaderFilterMetrics,
567        skip_fields: bool,
568    ) -> bool {
569        if !self.file_handle.meta_ref().inverted_index_available() {
570            return false;
571        }
572
573        let mut pruned = false;
574        // If skip_fields is true, only apply the first applier (for tags).
575        let appliers = if skip_fields {
576            &self.inverted_index_appliers[..1]
577        } else {
578            &self.inverted_index_appliers[..]
579        };
580        for index_applier in appliers.iter().flatten() {
581            let predicate_key = index_applier.predicate_key();
582            // Fast path: return early if the result is in the cache.
583            let cached = self
584                .cache_strategy
585                .index_result_cache()
586                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
587            if let Some(result) = cached.as_ref()
588                && all_required_row_groups_searched(output, result)
589            {
590                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
591                pruned = true;
592                continue;
593            }
594
595            // Slow path: apply the index from the file.
596            let file_size_hint = self.file_handle.meta_ref().index_file_size();
597            let apply_res = index_applier
598                .apply(self.file_handle.file_id(), Some(file_size_hint))
599                .await;
600            let selection = match apply_res {
601                Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
602                    row_group_size,
603                    num_row_groups,
604                    output,
605                ),
606                Err(err) => {
607                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
608                    continue;
609                }
610            };
611
612            self.apply_index_result_and_update_cache(
613                predicate_key,
614                self.file_handle.file_id().file_id(),
615                selection,
616                output,
617                metrics,
618                INDEX_TYPE_INVERTED,
619            );
620            pruned = true;
621        }
622        pruned
623    }
624
625    async fn prune_row_groups_by_bloom_filter(
626        &self,
627        row_group_size: usize,
628        parquet_meta: &ParquetMetaData,
629        output: &mut RowGroupSelection,
630        metrics: &mut ReaderFilterMetrics,
631        skip_fields: bool,
632    ) -> bool {
633        if !self.file_handle.meta_ref().bloom_filter_index_available() {
634            return false;
635        }
636
637        let mut pruned = false;
638        // If skip_fields is true, only apply the first applier (for tags).
639        let appliers = if skip_fields {
640            &self.bloom_filter_index_appliers[..1]
641        } else {
642            &self.bloom_filter_index_appliers[..]
643        };
644        for index_applier in appliers.iter().flatten() {
645            let predicate_key = index_applier.predicate_key();
646            // Fast path: return early if the result is in the cache.
647            let cached = self
648                .cache_strategy
649                .index_result_cache()
650                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
651            if let Some(result) = cached.as_ref()
652                && all_required_row_groups_searched(output, result)
653            {
654                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
655                pruned = true;
656                continue;
657            }
658
659            // Slow path: apply the index from the file.
660            let file_size_hint = self.file_handle.meta_ref().index_file_size();
661            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
662                (
663                    rg.num_rows() as usize,
664                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
665                    output.contains_non_empty_row_group(i)
666                        && cached
667                            .as_ref()
668                            .map(|c| !c.contains_row_group(i))
669                            .unwrap_or(true),
670                )
671            });
672            let apply_res = index_applier
673                .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
674                .await;
675            let mut selection = match apply_res {
676                Ok(apply_output) => {
677                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
678                }
679                Err(err) => {
680                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
681                    continue;
682                }
683            };
684
685            // New searched row groups are added to `selection`, concat them with `cached`.
686            if let Some(cached) = cached.as_ref() {
687                selection.concat(cached);
688            }
689
690            self.apply_index_result_and_update_cache(
691                predicate_key,
692                self.file_handle.file_id().file_id(),
693                selection,
694                output,
695                metrics,
696                INDEX_TYPE_BLOOM,
697            );
698            pruned = true;
699        }
700        pruned
701    }
702
703    async fn prune_row_groups_by_fulltext_bloom(
704        &self,
705        row_group_size: usize,
706        parquet_meta: &ParquetMetaData,
707        output: &mut RowGroupSelection,
708        metrics: &mut ReaderFilterMetrics,
709        skip_fields: bool,
710    ) -> bool {
711        if !self.file_handle.meta_ref().fulltext_index_available() {
712            return false;
713        }
714
715        let mut pruned = false;
716        // If skip_fields is true, only apply the first applier (for tags).
717        let appliers = if skip_fields {
718            &self.fulltext_index_appliers[..1]
719        } else {
720            &self.fulltext_index_appliers[..]
721        };
722        for index_applier in appliers.iter().flatten() {
723            let predicate_key = index_applier.predicate_key();
724            // Fast path: return early if the result is in the cache.
725            let cached = self
726                .cache_strategy
727                .index_result_cache()
728                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
729            if let Some(result) = cached.as_ref()
730                && all_required_row_groups_searched(output, result)
731            {
732                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
733                pruned = true;
734                continue;
735            }
736
737            // Slow path: apply the index from the file.
738            let file_size_hint = self.file_handle.meta_ref().index_file_size();
739            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
740                (
741                    rg.num_rows() as usize,
742                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
743                    output.contains_non_empty_row_group(i)
744                        && cached
745                            .as_ref()
746                            .map(|c| !c.contains_row_group(i))
747                            .unwrap_or(true),
748                )
749            });
750            let apply_res = index_applier
751                .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
752                .await;
753            let mut selection = match apply_res {
754                Ok(Some(apply_output)) => {
755                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
756                }
757                Ok(None) => continue,
758                Err(err) => {
759                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
760                    continue;
761                }
762            };
763
764            // New searched row groups are added to `selection`, concat them with `cached`.
765            if let Some(cached) = cached.as_ref() {
766                selection.concat(cached);
767            }
768
769            self.apply_index_result_and_update_cache(
770                predicate_key,
771                self.file_handle.file_id().file_id(),
772                selection,
773                output,
774                metrics,
775                INDEX_TYPE_FULLTEXT,
776            );
777            pruned = true;
778        }
779        pruned
780    }
781
782    /// Computes whether to skip field columns when building statistics based on PreFilterMode.
783    fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
784        match self.pre_filter_mode {
785            PreFilterMode::All => false,
786            PreFilterMode::SkipFields => true,
787            PreFilterMode::SkipFieldsOnDelete => {
788                // Check if any row group contains delete op
789                let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
790                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
791                    row_group_contains_delete(parquet_meta, rg_idx, &file_path)
792                        .inspect_err(|e| {
793                            warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
794                        })
795                        .unwrap_or(false)
796                })
797            }
798        }
799    }
800
801    /// Prunes row groups by min-max index.
802    fn prune_row_groups_by_minmax(
803        &self,
804        read_format: &ReadFormat,
805        parquet_meta: &ParquetMetaData,
806        output: &mut RowGroupSelection,
807        metrics: &mut ReaderFilterMetrics,
808        skip_fields: bool,
809    ) -> bool {
810        let Some(predicate) = &self.predicate else {
811            return false;
812        };
813
814        let row_groups_before = output.row_group_count();
815
816        let region_meta = read_format.metadata();
817        let row_groups = parquet_meta.row_groups();
818        let stats = RowGroupPruningStats::new(
819            row_groups,
820            read_format,
821            self.expected_metadata.clone(),
822            skip_fields,
823        );
824        let prune_schema = self
825            .expected_metadata
826            .as_ref()
827            .map(|meta| meta.schema.arrow_schema())
828            .unwrap_or_else(|| region_meta.schema.arrow_schema());
829
830        // Here we use the schema of the SST to build the physical expression. If the column
831        // in the SST doesn't have the same column id as the column in the expected metadata,
832        // we will get a None statistics for that column.
833        predicate
834            .prune_with_stats(&stats, prune_schema)
835            .iter()
836            .zip(0..parquet_meta.num_row_groups())
837            .for_each(|(mask, row_group)| {
838                if !*mask {
839                    output.remove_row_group(row_group);
840                }
841            });
842
843        let row_groups_after = output.row_group_count();
844        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
845
846        true
847    }
848
849    fn apply_index_result_and_update_cache(
850        &self,
851        predicate_key: &PredicateKey,
852        file_id: FileId,
853        result: RowGroupSelection,
854        output: &mut RowGroupSelection,
855        metrics: &mut ReaderFilterMetrics,
856        index_type: &str,
857    ) {
858        apply_selection_and_update_metrics(output, &result, metrics, index_type);
859
860        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
861            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
862        }
863    }
864}
865
866fn apply_selection_and_update_metrics(
867    output: &mut RowGroupSelection,
868    result: &RowGroupSelection,
869    metrics: &mut ReaderFilterMetrics,
870    index_type: &str,
871) {
872    let intersection = output.intersect(result);
873
874    let row_group_count = output.row_group_count() - intersection.row_group_count();
875    let row_count = output.row_count() - intersection.row_count();
876
877    metrics.update_index_metrics(index_type, row_group_count, row_count);
878
879    *output = intersection;
880}
881
882fn all_required_row_groups_searched(
883    required_row_groups: &RowGroupSelection,
884    cached_row_groups: &RowGroupSelection,
885) -> bool {
886    required_row_groups.iter().all(|(rg_id, _)| {
887        // Row group with no rows is not required to search.
888        !required_row_groups.contains_non_empty_row_group(*rg_id)
889            // The row group is already searched.
890            || cached_row_groups.contains_row_group(*rg_id)
891    })
892}
893
894/// Metrics of filtering rows groups and rows.
895#[derive(Debug, Default, Clone, Copy)]
896pub(crate) struct ReaderFilterMetrics {
897    /// Number of row groups before filtering.
898    pub(crate) rg_total: usize,
899    /// Number of row groups filtered by fulltext index.
900    pub(crate) rg_fulltext_filtered: usize,
901    /// Number of row groups filtered by inverted index.
902    pub(crate) rg_inverted_filtered: usize,
903    /// Number of row groups filtered by min-max index.
904    pub(crate) rg_minmax_filtered: usize,
905    /// Number of row groups filtered by bloom filter index.
906    pub(crate) rg_bloom_filtered: usize,
907
908    /// Number of rows in row group before filtering.
909    pub(crate) rows_total: usize,
910    /// Number of rows in row group filtered by fulltext index.
911    pub(crate) rows_fulltext_filtered: usize,
912    /// Number of rows in row group filtered by inverted index.
913    pub(crate) rows_inverted_filtered: usize,
914    /// Number of rows in row group filtered by bloom filter index.
915    pub(crate) rows_bloom_filtered: usize,
916    /// Number of rows filtered by precise filter.
917    pub(crate) rows_precise_filtered: usize,
918}
919
920impl ReaderFilterMetrics {
921    /// Adds `other` metrics to this metrics.
922    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
923        self.rg_total += other.rg_total;
924        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
925        self.rg_inverted_filtered += other.rg_inverted_filtered;
926        self.rg_minmax_filtered += other.rg_minmax_filtered;
927        self.rg_bloom_filtered += other.rg_bloom_filtered;
928
929        self.rows_total += other.rows_total;
930        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
931        self.rows_inverted_filtered += other.rows_inverted_filtered;
932        self.rows_bloom_filtered += other.rows_bloom_filtered;
933        self.rows_precise_filtered += other.rows_precise_filtered;
934    }
935
936    /// Reports metrics.
937    pub(crate) fn observe(&self) {
938        READ_ROW_GROUPS_TOTAL
939            .with_label_values(&["before_filtering"])
940            .inc_by(self.rg_total as u64);
941        READ_ROW_GROUPS_TOTAL
942            .with_label_values(&["fulltext_index_filtered"])
943            .inc_by(self.rg_fulltext_filtered as u64);
944        READ_ROW_GROUPS_TOTAL
945            .with_label_values(&["inverted_index_filtered"])
946            .inc_by(self.rg_inverted_filtered as u64);
947        READ_ROW_GROUPS_TOTAL
948            .with_label_values(&["minmax_index_filtered"])
949            .inc_by(self.rg_minmax_filtered as u64);
950        READ_ROW_GROUPS_TOTAL
951            .with_label_values(&["bloom_filter_index_filtered"])
952            .inc_by(self.rg_bloom_filtered as u64);
953
954        PRECISE_FILTER_ROWS_TOTAL
955            .with_label_values(&["parquet"])
956            .inc_by(self.rows_precise_filtered as u64);
957        READ_ROWS_IN_ROW_GROUP_TOTAL
958            .with_label_values(&["before_filtering"])
959            .inc_by(self.rows_total as u64);
960        READ_ROWS_IN_ROW_GROUP_TOTAL
961            .with_label_values(&["fulltext_index_filtered"])
962            .inc_by(self.rows_fulltext_filtered as u64);
963        READ_ROWS_IN_ROW_GROUP_TOTAL
964            .with_label_values(&["inverted_index_filtered"])
965            .inc_by(self.rows_inverted_filtered as u64);
966        READ_ROWS_IN_ROW_GROUP_TOTAL
967            .with_label_values(&["bloom_filter_index_filtered"])
968            .inc_by(self.rows_bloom_filtered as u64);
969    }
970
971    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
972        match index_type {
973            INDEX_TYPE_FULLTEXT => {
974                self.rg_fulltext_filtered += row_group_count;
975                self.rows_fulltext_filtered += row_count;
976            }
977            INDEX_TYPE_INVERTED => {
978                self.rg_inverted_filtered += row_group_count;
979                self.rows_inverted_filtered += row_count;
980            }
981            INDEX_TYPE_BLOOM => {
982                self.rg_bloom_filtered += row_group_count;
983                self.rows_bloom_filtered += row_count;
984            }
985            _ => {}
986        }
987    }
988}
989
990/// Parquet reader metrics.
991#[derive(Debug, Default, Clone)]
992pub struct ReaderMetrics {
993    /// Filtered row groups and rows metrics.
994    pub(crate) filter_metrics: ReaderFilterMetrics,
995    /// Duration to build the parquet reader.
996    pub(crate) build_cost: Duration,
997    /// Duration to scan the reader.
998    pub(crate) scan_cost: Duration,
999    /// Number of record batches read.
1000    pub(crate) num_record_batches: usize,
1001    /// Number of batches decoded.
1002    pub(crate) num_batches: usize,
1003    /// Number of rows read.
1004    pub(crate) num_rows: usize,
1005}
1006
1007impl ReaderMetrics {
1008    /// Adds `other` metrics to this metrics.
1009    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1010        self.filter_metrics.merge_from(&other.filter_metrics);
1011        self.build_cost += other.build_cost;
1012        self.scan_cost += other.scan_cost;
1013        self.num_record_batches += other.num_record_batches;
1014        self.num_batches += other.num_batches;
1015        self.num_rows += other.num_rows;
1016    }
1017
1018    /// Reports total rows.
1019    pub(crate) fn observe_rows(&self, read_type: &str) {
1020        READ_ROWS_TOTAL
1021            .with_label_values(&[read_type])
1022            .inc_by(self.num_rows as u64);
1023    }
1024}
1025
1026/// Builder to build a [ParquetRecordBatchReader] for a row group.
1027pub(crate) struct RowGroupReaderBuilder {
1028    /// SST file to read.
1029    ///
1030    /// Holds the file handle to avoid the file purge it.
1031    file_handle: FileHandle,
1032    /// Path of the file.
1033    file_path: String,
1034    /// Metadata of the parquet file.
1035    parquet_meta: Arc<ParquetMetaData>,
1036    /// Object store as an Operator.
1037    object_store: ObjectStore,
1038    /// Projection mask.
1039    projection: ProjectionMask,
1040    /// Field levels to read.
1041    field_levels: FieldLevels,
1042    /// Cache.
1043    cache_strategy: CacheStrategy,
1044}
1045
1046impl RowGroupReaderBuilder {
1047    /// Path of the file to read.
1048    pub(crate) fn file_path(&self) -> &str {
1049        &self.file_path
1050    }
1051
1052    /// Handle of the file to read.
1053    pub(crate) fn file_handle(&self) -> &FileHandle {
1054        &self.file_handle
1055    }
1056
1057    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1058        &self.parquet_meta
1059    }
1060
1061    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1062        &self.cache_strategy
1063    }
1064
1065    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
1066    pub(crate) async fn build(
1067        &self,
1068        row_group_idx: usize,
1069        row_selection: Option<RowSelection>,
1070    ) -> Result<ParquetRecordBatchReader> {
1071        let mut row_group = InMemoryRowGroup::create(
1072            self.file_handle.region_id(),
1073            self.file_handle.file_id().file_id(),
1074            &self.parquet_meta,
1075            row_group_idx,
1076            self.cache_strategy.clone(),
1077            &self.file_path,
1078            self.object_store.clone(),
1079        );
1080        // Fetches data into memory.
1081        row_group
1082            .fetch(&self.projection, row_selection.as_ref())
1083            .await
1084            .context(ReadParquetSnafu {
1085                path: &self.file_path,
1086            })?;
1087
1088        // Builds the parquet reader.
1089        // Now the row selection is None.
1090        ParquetRecordBatchReader::try_new_with_row_groups(
1091            &self.field_levels,
1092            &row_group,
1093            DEFAULT_READ_BATCH_SIZE,
1094            row_selection,
1095        )
1096        .context(ReadParquetSnafu {
1097            path: &self.file_path,
1098        })
1099    }
1100}
1101
1102/// The state of a [ParquetReader].
1103enum ReaderState {
1104    /// The reader is reading a row group.
1105    Readable(PruneReader),
1106    /// The reader is exhausted.
1107    Exhausted(ReaderMetrics),
1108}
1109
1110impl ReaderState {
1111    /// Returns the metrics of the reader.
1112    fn metrics(&self) -> ReaderMetrics {
1113        match self {
1114            ReaderState::Readable(reader) => reader.metrics(),
1115            ReaderState::Exhausted(m) => m.clone(),
1116        }
1117    }
1118}
1119
1120/// The filter to evaluate or the prune result of the default value.
1121pub(crate) enum MaybeFilter {
1122    /// The filter to evaluate.
1123    Filter(SimpleFilterEvaluator),
1124    /// The filter matches the default value.
1125    Matched,
1126    /// The filter is pruned.
1127    Pruned,
1128}
1129
1130/// Context to evaluate the column filter for a parquet file.
1131pub(crate) struct SimpleFilterContext {
1132    /// Filter to evaluate.
1133    filter: MaybeFilter,
1134    /// Id of the column to evaluate.
1135    column_id: ColumnId,
1136    /// Semantic type of the column.
1137    semantic_type: SemanticType,
1138    /// The data type of the column.
1139    data_type: ConcreteDataType,
1140}
1141
1142impl SimpleFilterContext {
1143    /// Creates a context for the `expr`.
1144    ///
1145    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1146    /// expected metadata.
1147    pub(crate) fn new_opt(
1148        sst_meta: &RegionMetadataRef,
1149        expected_meta: Option<&RegionMetadata>,
1150        expr: &Expr,
1151    ) -> Option<Self> {
1152        let filter = SimpleFilterEvaluator::try_new(expr)?;
1153        let (column_metadata, maybe_filter) = match expected_meta {
1154            Some(meta) => {
1155                // Gets the column metadata from the expected metadata.
1156                let column = meta.column_by_name(filter.column_name())?;
1157                // Checks if the column is present in the SST metadata. We still uses the
1158                // column from the expected metadata.
1159                match sst_meta.column_by_id(column.column_id) {
1160                    Some(sst_column) => {
1161                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1162
1163                        (column, MaybeFilter::Filter(filter))
1164                    }
1165                    None => {
1166                        // If the column is not present in the SST metadata, we evaluate the filter
1167                        // against the default value of the column.
1168                        // If we can't evaluate the filter, we return None.
1169                        if pruned_by_default(&filter, column)? {
1170                            (column, MaybeFilter::Pruned)
1171                        } else {
1172                            (column, MaybeFilter::Matched)
1173                        }
1174                    }
1175                }
1176            }
1177            None => {
1178                let column = sst_meta.column_by_name(filter.column_name())?;
1179                (column, MaybeFilter::Filter(filter))
1180            }
1181        };
1182
1183        Some(Self {
1184            filter: maybe_filter,
1185            column_id: column_metadata.column_id,
1186            semantic_type: column_metadata.semantic_type,
1187            data_type: column_metadata.column_schema.data_type.clone(),
1188        })
1189    }
1190
1191    /// Returns the filter to evaluate.
1192    pub(crate) fn filter(&self) -> &MaybeFilter {
1193        &self.filter
1194    }
1195
1196    /// Returns the column id.
1197    pub(crate) fn column_id(&self) -> ColumnId {
1198        self.column_id
1199    }
1200
1201    /// Returns the semantic type of the column.
1202    pub(crate) fn semantic_type(&self) -> SemanticType {
1203        self.semantic_type
1204    }
1205
1206    /// Returns the data type of the column.
1207    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1208        &self.data_type
1209    }
1210}
1211
1212/// Prune a column by its default value.
1213/// Returns false if we can't create the default value or evaluate the filter.
1214fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1215    let value = column.column_schema.create_default().ok().flatten()?;
1216    let scalar_value = value
1217        .try_to_scalar_value(&column.column_schema.data_type)
1218        .ok()?;
1219    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1220    Some(!matches)
1221}
1222
1223/// Parquet batch reader to read our SST format.
1224pub struct ParquetReader {
1225    /// File range context.
1226    context: FileRangeContextRef,
1227    /// Row group selection to read.
1228    selection: RowGroupSelection,
1229    /// Reader of current row group.
1230    reader_state: ReaderState,
1231}
1232
1233#[async_trait]
1234impl BatchReader for ParquetReader {
1235    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1236        let ReaderState::Readable(reader) = &mut self.reader_state else {
1237            return Ok(None);
1238        };
1239
1240        // We don't collect the elapsed time if the reader returns an error.
1241        if let Some(batch) = reader.next_batch().await? {
1242            return Ok(Some(batch));
1243        }
1244
1245        // No more items in current row group, reads next row group.
1246        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1247            let parquet_reader = self
1248                .context
1249                .reader_builder()
1250                .build(row_group_idx, Some(row_selection))
1251                .await?;
1252
1253            // Resets the parquet reader.
1254            // Compute skip_fields for this row group
1255            let skip_fields = self.context.should_skip_fields(row_group_idx);
1256            reader.reset_source(
1257                Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1258                skip_fields,
1259            );
1260            if let Some(batch) = reader.next_batch().await? {
1261                return Ok(Some(batch));
1262            }
1263        }
1264
1265        // The reader is exhausted.
1266        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1267        Ok(None)
1268    }
1269}
1270
1271impl Drop for ParquetReader {
1272    fn drop(&mut self) {
1273        let metrics = self.reader_state.metrics();
1274        debug!(
1275            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1276            self.context.reader_builder().file_handle.region_id(),
1277            self.context.reader_builder().file_handle.file_id(),
1278            self.context.reader_builder().file_handle.time_range(),
1279            metrics.filter_metrics.rg_total
1280                - metrics.filter_metrics.rg_inverted_filtered
1281                - metrics.filter_metrics.rg_minmax_filtered
1282                - metrics.filter_metrics.rg_fulltext_filtered
1283                - metrics.filter_metrics.rg_bloom_filtered,
1284            metrics.filter_metrics.rg_total,
1285            metrics
1286        );
1287
1288        // Report metrics.
1289        READ_STAGE_ELAPSED
1290            .with_label_values(&["build_parquet_reader"])
1291            .observe(metrics.build_cost.as_secs_f64());
1292        READ_STAGE_ELAPSED
1293            .with_label_values(&["scan_row_groups"])
1294            .observe(metrics.scan_cost.as_secs_f64());
1295        metrics.observe_rows("parquet_reader");
1296        metrics.filter_metrics.observe();
1297    }
1298}
1299
1300impl ParquetReader {
1301    /// Creates a new reader.
1302    pub(crate) async fn new(
1303        context: FileRangeContextRef,
1304        mut selection: RowGroupSelection,
1305    ) -> Result<Self> {
1306        // No more items in current row group, reads next row group.
1307        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1308            let parquet_reader = context
1309                .reader_builder()
1310                .build(row_group_idx, Some(row_selection))
1311                .await?;
1312            // Compute skip_fields once for this row group
1313            let skip_fields = context.should_skip_fields(row_group_idx);
1314            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1315                context.clone(),
1316                RowGroupReader::new(context.clone(), parquet_reader),
1317                skip_fields,
1318            ))
1319        } else {
1320            ReaderState::Exhausted(ReaderMetrics::default())
1321        };
1322
1323        Ok(ParquetReader {
1324            context,
1325            selection,
1326            reader_state,
1327        })
1328    }
1329
1330    /// Returns the metadata of the SST.
1331    pub fn metadata(&self) -> &RegionMetadataRef {
1332        self.context.read_format().metadata()
1333    }
1334
1335    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1336        self.context.reader_builder().parquet_meta.clone()
1337    }
1338}
1339
1340/// RowGroupReaderContext represents the fields that cannot be shared
1341/// between different `RowGroupReader`s.
1342pub(crate) trait RowGroupReaderContext: Send {
1343    fn map_result(
1344        &self,
1345        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1346    ) -> Result<Option<RecordBatch>>;
1347
1348    fn read_format(&self) -> &ReadFormat;
1349}
1350
1351impl RowGroupReaderContext for FileRangeContextRef {
1352    fn map_result(
1353        &self,
1354        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1355    ) -> Result<Option<RecordBatch>> {
1356        result.context(ArrowReaderSnafu {
1357            path: self.file_path(),
1358        })
1359    }
1360
1361    fn read_format(&self) -> &ReadFormat {
1362        self.as_ref().read_format()
1363    }
1364}
1365
1366/// [RowGroupReader] that reads from [FileRange].
1367pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1368
1369impl RowGroupReader {
1370    /// Creates a new reader from file range.
1371    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1372        Self::create(context, reader)
1373    }
1374}
1375
1376/// Reader to read a row group of a parquet file.
1377pub(crate) struct RowGroupReaderBase<T> {
1378    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1379    context: T,
1380    /// Inner parquet reader.
1381    reader: ParquetRecordBatchReader,
1382    /// Buffered batches to return.
1383    batches: VecDeque<Batch>,
1384    /// Local scan metrics.
1385    metrics: ReaderMetrics,
1386    /// Cached sequence array to override sequences.
1387    override_sequence: Option<ArrayRef>,
1388}
1389
1390impl<T> RowGroupReaderBase<T>
1391where
1392    T: RowGroupReaderContext,
1393{
1394    /// Creates a new reader to read the primary key format.
1395    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1396        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1397        let override_sequence = context
1398            .read_format()
1399            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1400        assert!(context.read_format().as_primary_key().is_some());
1401
1402        Self {
1403            context,
1404            reader,
1405            batches: VecDeque::new(),
1406            metrics: ReaderMetrics::default(),
1407            override_sequence,
1408        }
1409    }
1410
1411    /// Gets the metrics.
1412    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1413        &self.metrics
1414    }
1415
1416    /// Gets [ReadFormat] of underlying reader.
1417    pub(crate) fn read_format(&self) -> &ReadFormat {
1418        self.context.read_format()
1419    }
1420
1421    /// Tries to fetch next [RecordBatch] from the reader.
1422    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1423        self.context.map_result(self.reader.next().transpose())
1424    }
1425
1426    /// Returns the next [Batch].
1427    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1428        let scan_start = Instant::now();
1429        if let Some(batch) = self.batches.pop_front() {
1430            self.metrics.num_rows += batch.num_rows();
1431            self.metrics.scan_cost += scan_start.elapsed();
1432            return Ok(Some(batch));
1433        }
1434
1435        // We need to fetch next record batch and convert it to batches.
1436        while self.batches.is_empty() {
1437            let Some(record_batch) = self.fetch_next_record_batch()? else {
1438                self.metrics.scan_cost += scan_start.elapsed();
1439                return Ok(None);
1440            };
1441            self.metrics.num_record_batches += 1;
1442
1443            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
1444            self.context
1445                .read_format()
1446                .as_primary_key()
1447                .unwrap()
1448                .convert_record_batch(
1449                    &record_batch,
1450                    self.override_sequence.as_ref(),
1451                    &mut self.batches,
1452                )?;
1453            self.metrics.num_batches += self.batches.len();
1454        }
1455        let batch = self.batches.pop_front();
1456        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1457        self.metrics.scan_cost += scan_start.elapsed();
1458        Ok(batch)
1459    }
1460}
1461
1462#[async_trait::async_trait]
1463impl<T> BatchReader for RowGroupReaderBase<T>
1464where
1465    T: RowGroupReaderContext,
1466{
1467    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1468        self.next_inner()
1469    }
1470}
1471
1472/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
1473pub(crate) struct FlatRowGroupReader {
1474    /// Context for file ranges.
1475    context: FileRangeContextRef,
1476    /// Inner parquet reader.
1477    reader: ParquetRecordBatchReader,
1478    /// Cached sequence array to override sequences.
1479    override_sequence: Option<ArrayRef>,
1480}
1481
1482impl FlatRowGroupReader {
1483    /// Creates a new flat reader from file range.
1484    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1485        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1486        let override_sequence = context
1487            .read_format()
1488            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1489
1490        Self {
1491            context,
1492            reader,
1493            override_sequence,
1494        }
1495    }
1496
1497    /// Returns the next RecordBatch.
1498    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1499        match self.reader.next() {
1500            Some(batch_result) => {
1501                let record_batch = batch_result.context(ArrowReaderSnafu {
1502                    path: self.context.file_path(),
1503                })?;
1504
1505                // Safety: Only flat format use FlatRowGroupReader.
1506                let flat_format = self.context.read_format().as_flat().unwrap();
1507                let record_batch =
1508                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1509                Ok(Some(record_batch))
1510            }
1511            None => Ok(None),
1512        }
1513    }
1514}