mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46    ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::{
56    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
57};
58use crate::sst::index::fulltext_index::applier::{
59    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
60};
61use crate::sst::index::inverted_index::applier::{
62    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
63};
64use crate::sst::parquet::file_range::{
65    FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
66};
67use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
68use crate::sst::parquet::metadata::MetadataLoader;
69use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
70use crate::sst::parquet::row_selection::RowGroupSelection;
71use crate::sst::parquet::stats::RowGroupPruningStats;
72use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
73
74const INDEX_TYPE_FULLTEXT: &str = "fulltext";
75const INDEX_TYPE_INVERTED: &str = "inverted";
76const INDEX_TYPE_BLOOM: &str = "bloom filter";
77
78macro_rules! handle_index_error {
79    ($err:expr, $file_handle:expr, $index_type:expr) => {
80        if cfg!(any(test, feature = "test")) {
81            panic!(
82                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
83                $index_type,
84                $file_handle.region_id(),
85                $file_handle.file_id(),
86                $err
87            );
88        } else {
89            warn!(
90                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
91                $index_type,
92                $file_handle.region_id(),
93                $file_handle.file_id()
94            );
95        }
96    };
97}
98
99/// Parquet SST reader builder.
100pub struct ParquetReaderBuilder {
101    /// SST directory.
102    table_dir: String,
103    /// Path type for generating file paths.
104    path_type: PathType,
105    file_handle: FileHandle,
106    object_store: ObjectStore,
107    /// Predicate to push down.
108    predicate: Option<Predicate>,
109    /// Metadata of columns to read.
110    ///
111    /// `None` reads all columns. Due to schema change, the projection
112    /// can contain columns not in the parquet file.
113    projection: Option<Vec<ColumnId>>,
114    /// Strategy to cache SST data.
115    cache_strategy: CacheStrategy,
116    /// Index appliers.
117    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
118    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
119    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
120    /// Expected metadata of the region while reading the SST.
121    /// This is usually the latest metadata of the region. The reader use
122    /// it get the correct column id of a column by name.
123    expected_metadata: Option<RegionMetadataRef>,
124    /// Whether to use flat format for reading.
125    flat_format: bool,
126    /// Whether this reader is for compaction.
127    compaction: bool,
128    /// Mode to pre-filter columns.
129    pre_filter_mode: PreFilterMode,
130    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
131    decode_primary_key_values: bool,
132}
133
134impl ParquetReaderBuilder {
135    /// Returns a new [ParquetReaderBuilder] to read specific SST.
136    pub fn new(
137        table_dir: String,
138        path_type: PathType,
139        file_handle: FileHandle,
140        object_store: ObjectStore,
141    ) -> ParquetReaderBuilder {
142        ParquetReaderBuilder {
143            table_dir,
144            path_type,
145            file_handle,
146            object_store,
147            predicate: None,
148            projection: None,
149            cache_strategy: CacheStrategy::Disabled,
150            inverted_index_appliers: [None, None],
151            bloom_filter_index_appliers: [None, None],
152            fulltext_index_appliers: [None, None],
153            expected_metadata: None,
154            flat_format: false,
155            compaction: false,
156            pre_filter_mode: PreFilterMode::All,
157            decode_primary_key_values: false,
158        }
159    }
160
161    /// Attaches the predicate to the builder.
162    #[must_use]
163    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
164        self.predicate = predicate;
165        self
166    }
167
168    /// Attaches the projection to the builder.
169    ///
170    /// The reader only applies the projection to fields.
171    #[must_use]
172    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
173        self.projection = projection;
174        self
175    }
176
177    /// Attaches the cache to the builder.
178    #[must_use]
179    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
180        self.cache_strategy = cache;
181        self
182    }
183
184    /// Attaches the inverted index appliers to the builder.
185    #[must_use]
186    pub(crate) fn inverted_index_appliers(
187        mut self,
188        index_appliers: [Option<InvertedIndexApplierRef>; 2],
189    ) -> Self {
190        self.inverted_index_appliers = index_appliers;
191        self
192    }
193
194    /// Attaches the bloom filter index appliers to the builder.
195    #[must_use]
196    pub(crate) fn bloom_filter_index_appliers(
197        mut self,
198        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
199    ) -> Self {
200        self.bloom_filter_index_appliers = index_appliers;
201        self
202    }
203
204    /// Attaches the fulltext index appliers to the builder.
205    #[must_use]
206    pub(crate) fn fulltext_index_appliers(
207        mut self,
208        index_appliers: [Option<FulltextIndexApplierRef>; 2],
209    ) -> Self {
210        self.fulltext_index_appliers = index_appliers;
211        self
212    }
213
214    /// Attaches the expected metadata to the builder.
215    #[must_use]
216    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
217        self.expected_metadata = expected_metadata;
218        self
219    }
220
221    /// Sets the flat format flag.
222    #[must_use]
223    pub fn flat_format(mut self, flat_format: bool) -> Self {
224        self.flat_format = flat_format;
225        self
226    }
227
228    /// Sets the compaction flag.
229    #[must_use]
230    pub fn compaction(mut self, compaction: bool) -> Self {
231        self.compaction = compaction;
232        self
233    }
234
235    /// Sets the pre-filter mode.
236    #[must_use]
237    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
238        self.pre_filter_mode = pre_filter_mode;
239        self
240    }
241
242    /// Decodes primary key values eagerly when reading primary key format SSTs.
243    #[must_use]
244    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
245        self.decode_primary_key_values = decode;
246        self
247    }
248
249    /// Builds a [ParquetReader].
250    ///
251    /// This needs to perform IO operation.
252    pub async fn build(&self) -> Result<ParquetReader> {
253        let mut metrics = ReaderMetrics::default();
254
255        let (context, selection) = self.build_reader_input(&mut metrics).await?;
256        ParquetReader::new(Arc::new(context), selection).await
257    }
258
259    /// Builds a [FileRangeContext] and collects row groups to read.
260    ///
261    /// This needs to perform IO operation.
262    pub(crate) async fn build_reader_input(
263        &self,
264        metrics: &mut ReaderMetrics,
265    ) -> Result<(FileRangeContext, RowGroupSelection)> {
266        let start = Instant::now();
267
268        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
269        let file_size = self.file_handle.meta_ref().file_size;
270
271        // Loads parquet metadata of the file.
272        let parquet_meta = self
273            .read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
274            .await?;
275        // Decodes region metadata.
276        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
277        // Gets the metadata stored in the SST.
278        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
279        let mut read_format = if let Some(column_ids) = &self.projection {
280            ReadFormat::new(
281                region_meta.clone(),
282                Some(column_ids),
283                self.flat_format,
284                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
285                &file_path,
286                self.compaction,
287            )?
288        } else {
289            // Lists all column ids to read, we always use the expected metadata if possible.
290            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
291            let column_ids: Vec<_> = expected_meta
292                .column_metadatas
293                .iter()
294                .map(|col| col.column_id)
295                .collect();
296            ReadFormat::new(
297                region_meta.clone(),
298                Some(&column_ids),
299                self.flat_format,
300                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
301                &file_path,
302                self.compaction,
303            )?
304        };
305        if self.decode_primary_key_values {
306            read_format.set_decode_primary_key_values(true);
307        }
308        if need_override_sequence(&parquet_meta) {
309            read_format
310                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
311        }
312
313        // Computes the projection mask.
314        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
315        let indices = read_format.projection_indices();
316        // Now we assumes we don't have nested schemas.
317        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
318        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
319
320        // Computes the field levels.
321        let hint = Some(read_format.arrow_schema().fields());
322        let field_levels =
323            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
324                .context(ReadDataPartSnafu)?;
325        let selection = self
326            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
327            .await;
328
329        let reader_builder = RowGroupReaderBuilder {
330            file_handle: self.file_handle.clone(),
331            file_path,
332            parquet_meta,
333            object_store: self.object_store.clone(),
334            projection: projection_mask,
335            field_levels,
336            cache_strategy: self.cache_strategy.clone(),
337        };
338
339        let filters = if let Some(predicate) = &self.predicate {
340            predicate
341                .exprs()
342                .iter()
343                .filter_map(|expr| {
344                    SimpleFilterContext::new_opt(
345                        &region_meta,
346                        self.expected_metadata.as_deref(),
347                        expr,
348                    )
349                })
350                .collect::<Vec<_>>()
351        } else {
352            vec![]
353        };
354
355        let codec = build_primary_key_codec(read_format.metadata());
356
357        let context = FileRangeContext::new(
358            reader_builder,
359            filters,
360            read_format,
361            codec,
362            self.pre_filter_mode,
363        );
364
365        metrics.build_cost += start.elapsed();
366
367        Ok((context, selection))
368    }
369
370    /// Decodes region metadata from key value.
371    fn get_region_metadata(
372        file_path: &str,
373        key_value_meta: Option<&Vec<KeyValue>>,
374    ) -> Result<RegionMetadata> {
375        let key_values = key_value_meta.context(InvalidParquetSnafu {
376            file: file_path,
377            reason: "missing key value meta",
378        })?;
379        let meta_value = key_values
380            .iter()
381            .find(|kv| kv.key == PARQUET_METADATA_KEY)
382            .with_context(|| InvalidParquetSnafu {
383                file: file_path,
384                reason: format!("key {} not found", PARQUET_METADATA_KEY),
385            })?;
386        let json = meta_value
387            .value
388            .as_ref()
389            .with_context(|| InvalidParquetSnafu {
390                file: file_path,
391                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
392            })?;
393
394        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
395    }
396
397    /// Reads parquet metadata of specific file.
398    async fn read_parquet_metadata(
399        &self,
400        file_path: &str,
401        file_size: u64,
402        cache_metrics: &mut MetadataCacheMetrics,
403    ) -> Result<Arc<ParquetMetaData>> {
404        let start = Instant::now();
405        let _t = READ_STAGE_ELAPSED
406            .with_label_values(&["read_parquet_metadata"])
407            .start_timer();
408
409        let file_id = self.file_handle.file_id();
410        // Tries to get from cache with metrics tracking.
411        if let Some(metadata) = self
412            .cache_strategy
413            .get_parquet_meta_data(file_id, cache_metrics)
414            .await
415        {
416            cache_metrics.metadata_load_cost += start.elapsed();
417            return Ok(metadata);
418        }
419
420        // Cache miss, load metadata directly.
421        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
422        let metadata = metadata_loader.load().await?;
423
424        let metadata = Arc::new(metadata);
425        // Cache the metadata.
426        self.cache_strategy
427            .put_parquet_meta_data(file_id, metadata.clone());
428
429        cache_metrics.metadata_load_cost += start.elapsed();
430        Ok(metadata)
431    }
432
433    /// Computes row groups to read, along with their respective row selections.
434    async fn row_groups_to_read(
435        &self,
436        read_format: &ReadFormat,
437        parquet_meta: &ParquetMetaData,
438        metrics: &mut ReaderFilterMetrics,
439    ) -> RowGroupSelection {
440        let num_row_groups = parquet_meta.num_row_groups();
441        let num_rows = parquet_meta.file_metadata().num_rows();
442        if num_row_groups == 0 || num_rows == 0 {
443            return RowGroupSelection::default();
444        }
445
446        // Let's assume that the number of rows in the first row group
447        // can represent the `row_group_size` of the Parquet file.
448        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
449        if row_group_size == 0 {
450            return RowGroupSelection::default();
451        }
452
453        metrics.rg_total += num_row_groups;
454        metrics.rows_total += num_rows as usize;
455
456        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
457
458        // Compute skip_fields once for all pruning operations
459        let skip_fields = self.compute_skip_fields(parquet_meta);
460
461        self.prune_row_groups_by_minmax(
462            read_format,
463            parquet_meta,
464            &mut output,
465            metrics,
466            skip_fields,
467        );
468        if output.is_empty() {
469            return output;
470        }
471
472        let fulltext_filtered = self
473            .prune_row_groups_by_fulltext_index(
474                row_group_size,
475                num_row_groups,
476                &mut output,
477                metrics,
478                skip_fields,
479            )
480            .await;
481        if output.is_empty() {
482            return output;
483        }
484
485        self.prune_row_groups_by_inverted_index(
486            row_group_size,
487            num_row_groups,
488            &mut output,
489            metrics,
490            skip_fields,
491        )
492        .await;
493        if output.is_empty() {
494            return output;
495        }
496
497        self.prune_row_groups_by_bloom_filter(
498            row_group_size,
499            parquet_meta,
500            &mut output,
501            metrics,
502            skip_fields,
503        )
504        .await;
505        if output.is_empty() {
506            return output;
507        }
508
509        if !fulltext_filtered {
510            self.prune_row_groups_by_fulltext_bloom(
511                row_group_size,
512                parquet_meta,
513                &mut output,
514                metrics,
515                skip_fields,
516            )
517            .await;
518        }
519        output
520    }
521
522    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
523    async fn prune_row_groups_by_fulltext_index(
524        &self,
525        row_group_size: usize,
526        num_row_groups: usize,
527        output: &mut RowGroupSelection,
528        metrics: &mut ReaderFilterMetrics,
529        skip_fields: bool,
530    ) -> bool {
531        if !self.file_handle.meta_ref().fulltext_index_available() {
532            return false;
533        }
534
535        let mut pruned = false;
536        // If skip_fields is true, only apply the first applier (for tags).
537        let appliers = if skip_fields {
538            &self.fulltext_index_appliers[..1]
539        } else {
540            &self.fulltext_index_appliers[..]
541        };
542        for index_applier in appliers.iter().flatten() {
543            let predicate_key = index_applier.predicate_key();
544            // Fast path: return early if the result is in the cache.
545            let cached = self
546                .cache_strategy
547                .index_result_cache()
548                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
549            if let Some(result) = cached.as_ref()
550                && all_required_row_groups_searched(output, result)
551            {
552                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
553                pruned = true;
554                continue;
555            }
556
557            // Slow path: apply the index from the file.
558            let file_size_hint = self.file_handle.meta_ref().index_file_size();
559            let apply_res = index_applier
560                .apply_fine(
561                    self.file_handle.index_id(),
562                    Some(file_size_hint),
563                    metrics.fulltext_index_apply_metrics.as_mut(),
564                )
565                .await;
566            let selection = match apply_res {
567                Ok(Some(res)) => {
568                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
569                }
570                Ok(None) => continue,
571                Err(err) => {
572                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
573                    continue;
574                }
575            };
576
577            self.apply_index_result_and_update_cache(
578                predicate_key,
579                self.file_handle.file_id().file_id(),
580                selection,
581                output,
582                metrics,
583                INDEX_TYPE_FULLTEXT,
584            );
585            pruned = true;
586        }
587        pruned
588    }
589
590    /// Applies index to prune row groups.
591    ///
592    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
593    /// as an escape route in case of index issues, and it can be used to test
594    /// the correctness of the index.
595    async fn prune_row_groups_by_inverted_index(
596        &self,
597        row_group_size: usize,
598        num_row_groups: usize,
599        output: &mut RowGroupSelection,
600        metrics: &mut ReaderFilterMetrics,
601        skip_fields: bool,
602    ) -> bool {
603        if !self.file_handle.meta_ref().inverted_index_available() {
604            return false;
605        }
606
607        let mut pruned = false;
608        // If skip_fields is true, only apply the first applier (for tags).
609        let appliers = if skip_fields {
610            &self.inverted_index_appliers[..1]
611        } else {
612            &self.inverted_index_appliers[..]
613        };
614        for index_applier in appliers.iter().flatten() {
615            let predicate_key = index_applier.predicate_key();
616            // Fast path: return early if the result is in the cache.
617            let cached = self
618                .cache_strategy
619                .index_result_cache()
620                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
621            if let Some(result) = cached.as_ref()
622                && all_required_row_groups_searched(output, result)
623            {
624                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
625                pruned = true;
626                continue;
627            }
628
629            // Slow path: apply the index from the file.
630            let file_size_hint = self.file_handle.meta_ref().index_file_size();
631            let apply_res = index_applier
632                .apply(
633                    self.file_handle.index_id(),
634                    Some(file_size_hint),
635                    metrics.inverted_index_apply_metrics.as_mut(),
636                )
637                .await;
638            let selection = match apply_res {
639                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
640                    row_group_size,
641                    num_row_groups,
642                    apply_output,
643                ),
644                Err(err) => {
645                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
646                    continue;
647                }
648            };
649
650            self.apply_index_result_and_update_cache(
651                predicate_key,
652                self.file_handle.file_id().file_id(),
653                selection,
654                output,
655                metrics,
656                INDEX_TYPE_INVERTED,
657            );
658            pruned = true;
659        }
660        pruned
661    }
662
663    async fn prune_row_groups_by_bloom_filter(
664        &self,
665        row_group_size: usize,
666        parquet_meta: &ParquetMetaData,
667        output: &mut RowGroupSelection,
668        metrics: &mut ReaderFilterMetrics,
669        skip_fields: bool,
670    ) -> bool {
671        if !self.file_handle.meta_ref().bloom_filter_index_available() {
672            return false;
673        }
674
675        let mut pruned = false;
676        // If skip_fields is true, only apply the first applier (for tags).
677        let appliers = if skip_fields {
678            &self.bloom_filter_index_appliers[..1]
679        } else {
680            &self.bloom_filter_index_appliers[..]
681        };
682        for index_applier in appliers.iter().flatten() {
683            let predicate_key = index_applier.predicate_key();
684            // Fast path: return early if the result is in the cache.
685            let cached = self
686                .cache_strategy
687                .index_result_cache()
688                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
689            if let Some(result) = cached.as_ref()
690                && all_required_row_groups_searched(output, result)
691            {
692                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
693                pruned = true;
694                continue;
695            }
696
697            // Slow path: apply the index from the file.
698            let file_size_hint = self.file_handle.meta_ref().index_file_size();
699            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
700                (
701                    rg.num_rows() as usize,
702                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
703                    output.contains_non_empty_row_group(i)
704                        && cached
705                            .as_ref()
706                            .map(|c| !c.contains_row_group(i))
707                            .unwrap_or(true),
708                )
709            });
710            let apply_res = index_applier
711                .apply(
712                    self.file_handle.index_id(),
713                    Some(file_size_hint),
714                    rgs,
715                    metrics.bloom_filter_apply_metrics.as_mut(),
716                )
717                .await;
718            let mut selection = match apply_res {
719                Ok(apply_output) => {
720                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
721                }
722                Err(err) => {
723                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
724                    continue;
725                }
726            };
727
728            // New searched row groups are added to `selection`, concat them with `cached`.
729            if let Some(cached) = cached.as_ref() {
730                selection.concat(cached);
731            }
732
733            self.apply_index_result_and_update_cache(
734                predicate_key,
735                self.file_handle.file_id().file_id(),
736                selection,
737                output,
738                metrics,
739                INDEX_TYPE_BLOOM,
740            );
741            pruned = true;
742        }
743        pruned
744    }
745
746    async fn prune_row_groups_by_fulltext_bloom(
747        &self,
748        row_group_size: usize,
749        parquet_meta: &ParquetMetaData,
750        output: &mut RowGroupSelection,
751        metrics: &mut ReaderFilterMetrics,
752        skip_fields: bool,
753    ) -> bool {
754        if !self.file_handle.meta_ref().fulltext_index_available() {
755            return false;
756        }
757
758        let mut pruned = false;
759        // If skip_fields is true, only apply the first applier (for tags).
760        let appliers = if skip_fields {
761            &self.fulltext_index_appliers[..1]
762        } else {
763            &self.fulltext_index_appliers[..]
764        };
765        for index_applier in appliers.iter().flatten() {
766            let predicate_key = index_applier.predicate_key();
767            // Fast path: return early if the result is in the cache.
768            let cached = self
769                .cache_strategy
770                .index_result_cache()
771                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
772            if let Some(result) = cached.as_ref()
773                && all_required_row_groups_searched(output, result)
774            {
775                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
776                pruned = true;
777                continue;
778            }
779
780            // Slow path: apply the index from the file.
781            let file_size_hint = self.file_handle.meta_ref().index_file_size();
782            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
783                (
784                    rg.num_rows() as usize,
785                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
786                    output.contains_non_empty_row_group(i)
787                        && cached
788                            .as_ref()
789                            .map(|c| !c.contains_row_group(i))
790                            .unwrap_or(true),
791                )
792            });
793            let apply_res = index_applier
794                .apply_coarse(
795                    self.file_handle.index_id(),
796                    Some(file_size_hint),
797                    rgs,
798                    metrics.fulltext_index_apply_metrics.as_mut(),
799                )
800                .await;
801            let mut selection = match apply_res {
802                Ok(Some(apply_output)) => {
803                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
804                }
805                Ok(None) => continue,
806                Err(err) => {
807                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
808                    continue;
809                }
810            };
811
812            // New searched row groups are added to `selection`, concat them with `cached`.
813            if let Some(cached) = cached.as_ref() {
814                selection.concat(cached);
815            }
816
817            self.apply_index_result_and_update_cache(
818                predicate_key,
819                self.file_handle.file_id().file_id(),
820                selection,
821                output,
822                metrics,
823                INDEX_TYPE_FULLTEXT,
824            );
825            pruned = true;
826        }
827        pruned
828    }
829
830    /// Computes whether to skip field columns when building statistics based on PreFilterMode.
831    fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
832        match self.pre_filter_mode {
833            PreFilterMode::All => false,
834            PreFilterMode::SkipFields => true,
835            PreFilterMode::SkipFieldsOnDelete => {
836                // Check if any row group contains delete op
837                let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
838                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
839                    row_group_contains_delete(parquet_meta, rg_idx, &file_path)
840                        .inspect_err(|e| {
841                            warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
842                        })
843                        .unwrap_or(false)
844                })
845            }
846        }
847    }
848
849    /// Prunes row groups by min-max index.
850    fn prune_row_groups_by_minmax(
851        &self,
852        read_format: &ReadFormat,
853        parquet_meta: &ParquetMetaData,
854        output: &mut RowGroupSelection,
855        metrics: &mut ReaderFilterMetrics,
856        skip_fields: bool,
857    ) -> bool {
858        let Some(predicate) = &self.predicate else {
859            return false;
860        };
861
862        let row_groups_before = output.row_group_count();
863
864        let region_meta = read_format.metadata();
865        let row_groups = parquet_meta.row_groups();
866        let stats = RowGroupPruningStats::new(
867            row_groups,
868            read_format,
869            self.expected_metadata.clone(),
870            skip_fields,
871        );
872        let prune_schema = self
873            .expected_metadata
874            .as_ref()
875            .map(|meta| meta.schema.arrow_schema())
876            .unwrap_or_else(|| region_meta.schema.arrow_schema());
877
878        // Here we use the schema of the SST to build the physical expression. If the column
879        // in the SST doesn't have the same column id as the column in the expected metadata,
880        // we will get a None statistics for that column.
881        predicate
882            .prune_with_stats(&stats, prune_schema)
883            .iter()
884            .zip(0..parquet_meta.num_row_groups())
885            .for_each(|(mask, row_group)| {
886                if !*mask {
887                    output.remove_row_group(row_group);
888                }
889            });
890
891        let row_groups_after = output.row_group_count();
892        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
893
894        true
895    }
896
897    fn apply_index_result_and_update_cache(
898        &self,
899        predicate_key: &PredicateKey,
900        file_id: FileId,
901        result: RowGroupSelection,
902        output: &mut RowGroupSelection,
903        metrics: &mut ReaderFilterMetrics,
904        index_type: &str,
905    ) {
906        apply_selection_and_update_metrics(output, &result, metrics, index_type);
907
908        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
909            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
910        }
911    }
912}
913
914fn apply_selection_and_update_metrics(
915    output: &mut RowGroupSelection,
916    result: &RowGroupSelection,
917    metrics: &mut ReaderFilterMetrics,
918    index_type: &str,
919) {
920    let intersection = output.intersect(result);
921
922    let row_group_count = output.row_group_count() - intersection.row_group_count();
923    let row_count = output.row_count() - intersection.row_count();
924
925    metrics.update_index_metrics(index_type, row_group_count, row_count);
926
927    *output = intersection;
928}
929
930fn all_required_row_groups_searched(
931    required_row_groups: &RowGroupSelection,
932    cached_row_groups: &RowGroupSelection,
933) -> bool {
934    required_row_groups.iter().all(|(rg_id, _)| {
935        // Row group with no rows is not required to search.
936        !required_row_groups.contains_non_empty_row_group(*rg_id)
937            // The row group is already searched.
938            || cached_row_groups.contains_row_group(*rg_id)
939    })
940}
941
942/// Metrics of filtering rows groups and rows.
943#[derive(Debug, Default, Clone)]
944pub(crate) struct ReaderFilterMetrics {
945    /// Number of row groups before filtering.
946    pub(crate) rg_total: usize,
947    /// Number of row groups filtered by fulltext index.
948    pub(crate) rg_fulltext_filtered: usize,
949    /// Number of row groups filtered by inverted index.
950    pub(crate) rg_inverted_filtered: usize,
951    /// Number of row groups filtered by min-max index.
952    pub(crate) rg_minmax_filtered: usize,
953    /// Number of row groups filtered by bloom filter index.
954    pub(crate) rg_bloom_filtered: usize,
955
956    /// Number of rows in row group before filtering.
957    pub(crate) rows_total: usize,
958    /// Number of rows in row group filtered by fulltext index.
959    pub(crate) rows_fulltext_filtered: usize,
960    /// Number of rows in row group filtered by inverted index.
961    pub(crate) rows_inverted_filtered: usize,
962    /// Number of rows in row group filtered by bloom filter index.
963    pub(crate) rows_bloom_filtered: usize,
964    /// Number of rows filtered by precise filter.
965    pub(crate) rows_precise_filtered: usize,
966
967    /// Optional metrics for inverted index applier.
968    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
969    /// Optional metrics for bloom filter index applier.
970    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
971    /// Optional metrics for fulltext index applier.
972    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
973}
974
975impl ReaderFilterMetrics {
976    /// Adds `other` metrics to this metrics.
977    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
978        self.rg_total += other.rg_total;
979        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
980        self.rg_inverted_filtered += other.rg_inverted_filtered;
981        self.rg_minmax_filtered += other.rg_minmax_filtered;
982        self.rg_bloom_filtered += other.rg_bloom_filtered;
983
984        self.rows_total += other.rows_total;
985        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
986        self.rows_inverted_filtered += other.rows_inverted_filtered;
987        self.rows_bloom_filtered += other.rows_bloom_filtered;
988        self.rows_precise_filtered += other.rows_precise_filtered;
989
990        // Merge optional applier metrics
991        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
992            self.inverted_index_apply_metrics
993                .get_or_insert_with(Default::default)
994                .merge_from(other_metrics);
995        }
996        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
997            self.bloom_filter_apply_metrics
998                .get_or_insert_with(Default::default)
999                .merge_from(other_metrics);
1000        }
1001        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1002            self.fulltext_index_apply_metrics
1003                .get_or_insert_with(Default::default)
1004                .merge_from(other_metrics);
1005        }
1006    }
1007
1008    /// Reports metrics.
1009    pub(crate) fn observe(&self) {
1010        READ_ROW_GROUPS_TOTAL
1011            .with_label_values(&["before_filtering"])
1012            .inc_by(self.rg_total as u64);
1013        READ_ROW_GROUPS_TOTAL
1014            .with_label_values(&["fulltext_index_filtered"])
1015            .inc_by(self.rg_fulltext_filtered as u64);
1016        READ_ROW_GROUPS_TOTAL
1017            .with_label_values(&["inverted_index_filtered"])
1018            .inc_by(self.rg_inverted_filtered as u64);
1019        READ_ROW_GROUPS_TOTAL
1020            .with_label_values(&["minmax_index_filtered"])
1021            .inc_by(self.rg_minmax_filtered as u64);
1022        READ_ROW_GROUPS_TOTAL
1023            .with_label_values(&["bloom_filter_index_filtered"])
1024            .inc_by(self.rg_bloom_filtered as u64);
1025
1026        PRECISE_FILTER_ROWS_TOTAL
1027            .with_label_values(&["parquet"])
1028            .inc_by(self.rows_precise_filtered as u64);
1029        READ_ROWS_IN_ROW_GROUP_TOTAL
1030            .with_label_values(&["before_filtering"])
1031            .inc_by(self.rows_total as u64);
1032        READ_ROWS_IN_ROW_GROUP_TOTAL
1033            .with_label_values(&["fulltext_index_filtered"])
1034            .inc_by(self.rows_fulltext_filtered as u64);
1035        READ_ROWS_IN_ROW_GROUP_TOTAL
1036            .with_label_values(&["inverted_index_filtered"])
1037            .inc_by(self.rows_inverted_filtered as u64);
1038        READ_ROWS_IN_ROW_GROUP_TOTAL
1039            .with_label_values(&["bloom_filter_index_filtered"])
1040            .inc_by(self.rows_bloom_filtered as u64);
1041    }
1042
1043    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1044        match index_type {
1045            INDEX_TYPE_FULLTEXT => {
1046                self.rg_fulltext_filtered += row_group_count;
1047                self.rows_fulltext_filtered += row_count;
1048            }
1049            INDEX_TYPE_INVERTED => {
1050                self.rg_inverted_filtered += row_group_count;
1051                self.rows_inverted_filtered += row_count;
1052            }
1053            INDEX_TYPE_BLOOM => {
1054                self.rg_bloom_filtered += row_group_count;
1055                self.rows_bloom_filtered += row_count;
1056            }
1057            _ => {}
1058        }
1059    }
1060}
1061
1062/// Metrics for parquet metadata cache operations.
1063#[derive(Default, Clone, Copy)]
1064pub(crate) struct MetadataCacheMetrics {
1065    /// Number of memory cache hits for parquet metadata.
1066    pub(crate) mem_cache_hit: usize,
1067    /// Number of file cache hits for parquet metadata.
1068    pub(crate) file_cache_hit: usize,
1069    /// Number of cache misses for parquet metadata.
1070    pub(crate) cache_miss: usize,
1071    /// Duration to load parquet metadata.
1072    pub(crate) metadata_load_cost: Duration,
1073}
1074
1075impl std::fmt::Debug for MetadataCacheMetrics {
1076    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1077        let Self {
1078            mem_cache_hit,
1079            file_cache_hit,
1080            cache_miss,
1081            metadata_load_cost,
1082        } = self;
1083
1084        if self.is_empty() {
1085            return write!(f, "{{}}");
1086        }
1087        write!(f, "{{")?;
1088
1089        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1090
1091        if *mem_cache_hit > 0 {
1092            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1093        }
1094        if *file_cache_hit > 0 {
1095            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1096        }
1097        if *cache_miss > 0 {
1098            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1099        }
1100
1101        write!(f, "}}")
1102    }
1103}
1104
1105impl MetadataCacheMetrics {
1106    /// Returns true if the metrics are empty (contain no meaningful data).
1107    pub(crate) fn is_empty(&self) -> bool {
1108        self.metadata_load_cost.is_zero()
1109    }
1110
1111    /// Adds `other` metrics to this metrics.
1112    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1113        self.mem_cache_hit += other.mem_cache_hit;
1114        self.file_cache_hit += other.file_cache_hit;
1115        self.cache_miss += other.cache_miss;
1116        self.metadata_load_cost += other.metadata_load_cost;
1117    }
1118}
1119
1120/// Parquet reader metrics.
1121#[derive(Debug, Default, Clone)]
1122pub struct ReaderMetrics {
1123    /// Filtered row groups and rows metrics.
1124    pub(crate) filter_metrics: ReaderFilterMetrics,
1125    /// Duration to build the parquet reader.
1126    pub(crate) build_cost: Duration,
1127    /// Duration to scan the reader.
1128    pub(crate) scan_cost: Duration,
1129    /// Number of record batches read.
1130    pub(crate) num_record_batches: usize,
1131    /// Number of batches decoded.
1132    pub(crate) num_batches: usize,
1133    /// Number of rows read.
1134    pub(crate) num_rows: usize,
1135    /// Metrics for parquet metadata cache.
1136    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1137    /// Optional metrics for page/row group fetch operations.
1138    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1139}
1140
1141impl ReaderMetrics {
1142    /// Adds `other` metrics to this metrics.
1143    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1144        self.filter_metrics.merge_from(&other.filter_metrics);
1145        self.build_cost += other.build_cost;
1146        self.scan_cost += other.scan_cost;
1147        self.num_record_batches += other.num_record_batches;
1148        self.num_batches += other.num_batches;
1149        self.num_rows += other.num_rows;
1150        self.metadata_cache_metrics
1151            .merge_from(&other.metadata_cache_metrics);
1152        if let Some(other_fetch) = &other.fetch_metrics {
1153            if let Some(self_fetch) = &self.fetch_metrics {
1154                self_fetch.merge_from(other_fetch);
1155            } else {
1156                self.fetch_metrics = Some(other_fetch.clone());
1157            }
1158        }
1159    }
1160
1161    /// Reports total rows.
1162    pub(crate) fn observe_rows(&self, read_type: &str) {
1163        READ_ROWS_TOTAL
1164            .with_label_values(&[read_type])
1165            .inc_by(self.num_rows as u64);
1166    }
1167}
1168
1169/// Builder to build a [ParquetRecordBatchReader] for a row group.
1170pub(crate) struct RowGroupReaderBuilder {
1171    /// SST file to read.
1172    ///
1173    /// Holds the file handle to avoid the file purge it.
1174    file_handle: FileHandle,
1175    /// Path of the file.
1176    file_path: String,
1177    /// Metadata of the parquet file.
1178    parquet_meta: Arc<ParquetMetaData>,
1179    /// Object store as an Operator.
1180    object_store: ObjectStore,
1181    /// Projection mask.
1182    projection: ProjectionMask,
1183    /// Field levels to read.
1184    field_levels: FieldLevels,
1185    /// Cache.
1186    cache_strategy: CacheStrategy,
1187}
1188
1189impl RowGroupReaderBuilder {
1190    /// Path of the file to read.
1191    pub(crate) fn file_path(&self) -> &str {
1192        &self.file_path
1193    }
1194
1195    /// Handle of the file to read.
1196    pub(crate) fn file_handle(&self) -> &FileHandle {
1197        &self.file_handle
1198    }
1199
1200    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1201        &self.parquet_meta
1202    }
1203
1204    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1205        &self.cache_strategy
1206    }
1207
1208    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
1209    pub(crate) async fn build(
1210        &self,
1211        row_group_idx: usize,
1212        row_selection: Option<RowSelection>,
1213        fetch_metrics: Option<&ParquetFetchMetrics>,
1214    ) -> Result<ParquetRecordBatchReader> {
1215        let fetch_start = Instant::now();
1216
1217        let mut row_group = InMemoryRowGroup::create(
1218            self.file_handle.region_id(),
1219            self.file_handle.file_id().file_id(),
1220            &self.parquet_meta,
1221            row_group_idx,
1222            self.cache_strategy.clone(),
1223            &self.file_path,
1224            self.object_store.clone(),
1225        );
1226        // Fetches data into memory.
1227        row_group
1228            .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1229            .await
1230            .context(ReadParquetSnafu {
1231                path: &self.file_path,
1232            })?;
1233
1234        // Record total fetch elapsed time.
1235        if let Some(metrics) = fetch_metrics {
1236            metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1237        }
1238
1239        // Builds the parquet reader.
1240        // Now the row selection is None.
1241        ParquetRecordBatchReader::try_new_with_row_groups(
1242            &self.field_levels,
1243            &row_group,
1244            DEFAULT_READ_BATCH_SIZE,
1245            row_selection,
1246        )
1247        .context(ReadParquetSnafu {
1248            path: &self.file_path,
1249        })
1250    }
1251}
1252
1253/// The state of a [ParquetReader].
1254enum ReaderState {
1255    /// The reader is reading a row group.
1256    Readable(PruneReader),
1257    /// The reader is exhausted.
1258    Exhausted(ReaderMetrics),
1259}
1260
1261impl ReaderState {
1262    /// Returns the metrics of the reader.
1263    fn metrics(&self) -> ReaderMetrics {
1264        match self {
1265            ReaderState::Readable(reader) => reader.metrics(),
1266            ReaderState::Exhausted(m) => m.clone(),
1267        }
1268    }
1269}
1270
1271/// The filter to evaluate or the prune result of the default value.
1272pub(crate) enum MaybeFilter {
1273    /// The filter to evaluate.
1274    Filter(SimpleFilterEvaluator),
1275    /// The filter matches the default value.
1276    Matched,
1277    /// The filter is pruned.
1278    Pruned,
1279}
1280
1281/// Context to evaluate the column filter for a parquet file.
1282pub(crate) struct SimpleFilterContext {
1283    /// Filter to evaluate.
1284    filter: MaybeFilter,
1285    /// Id of the column to evaluate.
1286    column_id: ColumnId,
1287    /// Semantic type of the column.
1288    semantic_type: SemanticType,
1289    /// The data type of the column.
1290    data_type: ConcreteDataType,
1291}
1292
1293impl SimpleFilterContext {
1294    /// Creates a context for the `expr`.
1295    ///
1296    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1297    /// expected metadata.
1298    pub(crate) fn new_opt(
1299        sst_meta: &RegionMetadataRef,
1300        expected_meta: Option<&RegionMetadata>,
1301        expr: &Expr,
1302    ) -> Option<Self> {
1303        let filter = SimpleFilterEvaluator::try_new(expr)?;
1304        let (column_metadata, maybe_filter) = match expected_meta {
1305            Some(meta) => {
1306                // Gets the column metadata from the expected metadata.
1307                let column = meta.column_by_name(filter.column_name())?;
1308                // Checks if the column is present in the SST metadata. We still uses the
1309                // column from the expected metadata.
1310                match sst_meta.column_by_id(column.column_id) {
1311                    Some(sst_column) => {
1312                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1313
1314                        (column, MaybeFilter::Filter(filter))
1315                    }
1316                    None => {
1317                        // If the column is not present in the SST metadata, we evaluate the filter
1318                        // against the default value of the column.
1319                        // If we can't evaluate the filter, we return None.
1320                        if pruned_by_default(&filter, column)? {
1321                            (column, MaybeFilter::Pruned)
1322                        } else {
1323                            (column, MaybeFilter::Matched)
1324                        }
1325                    }
1326                }
1327            }
1328            None => {
1329                let column = sst_meta.column_by_name(filter.column_name())?;
1330                (column, MaybeFilter::Filter(filter))
1331            }
1332        };
1333
1334        Some(Self {
1335            filter: maybe_filter,
1336            column_id: column_metadata.column_id,
1337            semantic_type: column_metadata.semantic_type,
1338            data_type: column_metadata.column_schema.data_type.clone(),
1339        })
1340    }
1341
1342    /// Returns the filter to evaluate.
1343    pub(crate) fn filter(&self) -> &MaybeFilter {
1344        &self.filter
1345    }
1346
1347    /// Returns the column id.
1348    pub(crate) fn column_id(&self) -> ColumnId {
1349        self.column_id
1350    }
1351
1352    /// Returns the semantic type of the column.
1353    pub(crate) fn semantic_type(&self) -> SemanticType {
1354        self.semantic_type
1355    }
1356
1357    /// Returns the data type of the column.
1358    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1359        &self.data_type
1360    }
1361}
1362
1363/// Prune a column by its default value.
1364/// Returns false if we can't create the default value or evaluate the filter.
1365fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1366    let value = column.column_schema.create_default().ok().flatten()?;
1367    let scalar_value = value
1368        .try_to_scalar_value(&column.column_schema.data_type)
1369        .ok()?;
1370    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1371    Some(!matches)
1372}
1373
1374/// Parquet batch reader to read our SST format.
1375pub struct ParquetReader {
1376    /// File range context.
1377    context: FileRangeContextRef,
1378    /// Row group selection to read.
1379    selection: RowGroupSelection,
1380    /// Reader of current row group.
1381    reader_state: ReaderState,
1382    /// Metrics for tracking row group fetch operations.
1383    fetch_metrics: ParquetFetchMetrics,
1384}
1385
1386#[async_trait]
1387impl BatchReader for ParquetReader {
1388    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1389        let ReaderState::Readable(reader) = &mut self.reader_state else {
1390            return Ok(None);
1391        };
1392
1393        // We don't collect the elapsed time if the reader returns an error.
1394        if let Some(batch) = reader.next_batch().await? {
1395            return Ok(Some(batch));
1396        }
1397
1398        // No more items in current row group, reads next row group.
1399        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1400            let parquet_reader = self
1401                .context
1402                .reader_builder()
1403                .build(
1404                    row_group_idx,
1405                    Some(row_selection),
1406                    Some(&self.fetch_metrics),
1407                )
1408                .await?;
1409
1410            // Resets the parquet reader.
1411            // Compute skip_fields for this row group
1412            let skip_fields = self.context.should_skip_fields(row_group_idx);
1413            reader.reset_source(
1414                Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1415                skip_fields,
1416            );
1417            if let Some(batch) = reader.next_batch().await? {
1418                return Ok(Some(batch));
1419            }
1420        }
1421
1422        // The reader is exhausted.
1423        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1424        Ok(None)
1425    }
1426}
1427
1428impl Drop for ParquetReader {
1429    fn drop(&mut self) {
1430        let metrics = self.reader_state.metrics();
1431        debug!(
1432            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1433            self.context.reader_builder().file_handle.region_id(),
1434            self.context.reader_builder().file_handle.file_id(),
1435            self.context.reader_builder().file_handle.time_range(),
1436            metrics.filter_metrics.rg_total
1437                - metrics.filter_metrics.rg_inverted_filtered
1438                - metrics.filter_metrics.rg_minmax_filtered
1439                - metrics.filter_metrics.rg_fulltext_filtered
1440                - metrics.filter_metrics.rg_bloom_filtered,
1441            metrics.filter_metrics.rg_total,
1442            metrics
1443        );
1444
1445        // Report metrics.
1446        READ_STAGE_ELAPSED
1447            .with_label_values(&["build_parquet_reader"])
1448            .observe(metrics.build_cost.as_secs_f64());
1449        READ_STAGE_ELAPSED
1450            .with_label_values(&["scan_row_groups"])
1451            .observe(metrics.scan_cost.as_secs_f64());
1452        metrics.observe_rows("parquet_reader");
1453        metrics.filter_metrics.observe();
1454    }
1455}
1456
1457impl ParquetReader {
1458    /// Creates a new reader.
1459    pub(crate) async fn new(
1460        context: FileRangeContextRef,
1461        mut selection: RowGroupSelection,
1462    ) -> Result<Self> {
1463        let fetch_metrics = ParquetFetchMetrics::default();
1464        // No more items in current row group, reads next row group.
1465        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1466            let parquet_reader = context
1467                .reader_builder()
1468                .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1469                .await?;
1470            // Compute skip_fields once for this row group
1471            let skip_fields = context.should_skip_fields(row_group_idx);
1472            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1473                context.clone(),
1474                RowGroupReader::new(context.clone(), parquet_reader),
1475                skip_fields,
1476            ))
1477        } else {
1478            ReaderState::Exhausted(ReaderMetrics::default())
1479        };
1480
1481        Ok(ParquetReader {
1482            context,
1483            selection,
1484            reader_state,
1485            fetch_metrics,
1486        })
1487    }
1488
1489    /// Returns the metadata of the SST.
1490    pub fn metadata(&self) -> &RegionMetadataRef {
1491        self.context.read_format().metadata()
1492    }
1493
1494    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1495        self.context.reader_builder().parquet_meta.clone()
1496    }
1497}
1498
1499/// RowGroupReaderContext represents the fields that cannot be shared
1500/// between different `RowGroupReader`s.
1501pub(crate) trait RowGroupReaderContext: Send {
1502    fn map_result(
1503        &self,
1504        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1505    ) -> Result<Option<RecordBatch>>;
1506
1507    fn read_format(&self) -> &ReadFormat;
1508}
1509
1510impl RowGroupReaderContext for FileRangeContextRef {
1511    fn map_result(
1512        &self,
1513        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1514    ) -> Result<Option<RecordBatch>> {
1515        result.context(ArrowReaderSnafu {
1516            path: self.file_path(),
1517        })
1518    }
1519
1520    fn read_format(&self) -> &ReadFormat {
1521        self.as_ref().read_format()
1522    }
1523}
1524
1525/// [RowGroupReader] that reads from [FileRange].
1526pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1527
1528impl RowGroupReader {
1529    /// Creates a new reader from file range.
1530    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1531        Self::create(context, reader)
1532    }
1533}
1534
1535/// Reader to read a row group of a parquet file.
1536pub(crate) struct RowGroupReaderBase<T> {
1537    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1538    context: T,
1539    /// Inner parquet reader.
1540    reader: ParquetRecordBatchReader,
1541    /// Buffered batches to return.
1542    batches: VecDeque<Batch>,
1543    /// Local scan metrics.
1544    metrics: ReaderMetrics,
1545    /// Cached sequence array to override sequences.
1546    override_sequence: Option<ArrayRef>,
1547}
1548
1549impl<T> RowGroupReaderBase<T>
1550where
1551    T: RowGroupReaderContext,
1552{
1553    /// Creates a new reader to read the primary key format.
1554    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1555        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1556        let override_sequence = context
1557            .read_format()
1558            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1559        assert!(context.read_format().as_primary_key().is_some());
1560
1561        Self {
1562            context,
1563            reader,
1564            batches: VecDeque::new(),
1565            metrics: ReaderMetrics::default(),
1566            override_sequence,
1567        }
1568    }
1569
1570    /// Gets the metrics.
1571    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1572        &self.metrics
1573    }
1574
1575    /// Gets [ReadFormat] of underlying reader.
1576    pub(crate) fn read_format(&self) -> &ReadFormat {
1577        self.context.read_format()
1578    }
1579
1580    /// Tries to fetch next [RecordBatch] from the reader.
1581    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1582        self.context.map_result(self.reader.next().transpose())
1583    }
1584
1585    /// Returns the next [Batch].
1586    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1587        let scan_start = Instant::now();
1588        if let Some(batch) = self.batches.pop_front() {
1589            self.metrics.num_rows += batch.num_rows();
1590            self.metrics.scan_cost += scan_start.elapsed();
1591            return Ok(Some(batch));
1592        }
1593
1594        // We need to fetch next record batch and convert it to batches.
1595        while self.batches.is_empty() {
1596            let Some(record_batch) = self.fetch_next_record_batch()? else {
1597                self.metrics.scan_cost += scan_start.elapsed();
1598                return Ok(None);
1599            };
1600            self.metrics.num_record_batches += 1;
1601
1602            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
1603            self.context
1604                .read_format()
1605                .as_primary_key()
1606                .unwrap()
1607                .convert_record_batch(
1608                    &record_batch,
1609                    self.override_sequence.as_ref(),
1610                    &mut self.batches,
1611                )?;
1612            self.metrics.num_batches += self.batches.len();
1613        }
1614        let batch = self.batches.pop_front();
1615        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1616        self.metrics.scan_cost += scan_start.elapsed();
1617        Ok(batch)
1618    }
1619}
1620
1621#[async_trait::async_trait]
1622impl<T> BatchReader for RowGroupReaderBase<T>
1623where
1624    T: RowGroupReaderContext,
1625{
1626    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1627        self.next_inner()
1628    }
1629}
1630
1631/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
1632pub(crate) struct FlatRowGroupReader {
1633    /// Context for file ranges.
1634    context: FileRangeContextRef,
1635    /// Inner parquet reader.
1636    reader: ParquetRecordBatchReader,
1637    /// Cached sequence array to override sequences.
1638    override_sequence: Option<ArrayRef>,
1639}
1640
1641impl FlatRowGroupReader {
1642    /// Creates a new flat reader from file range.
1643    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1644        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1645        let override_sequence = context
1646            .read_format()
1647            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1648
1649        Self {
1650            context,
1651            reader,
1652            override_sequence,
1653        }
1654    }
1655
1656    /// Returns the next RecordBatch.
1657    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1658        match self.reader.next() {
1659            Some(batch_result) => {
1660                let record_batch = batch_result.context(ArrowReaderSnafu {
1661                    path: self.context.file_path(),
1662                })?;
1663
1664                // Safety: Only flat format use FlatRowGroupReader.
1665                let flat_format = self.context.read_format().as_flat().unwrap();
1666                let record_batch =
1667                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1668                Ok(Some(record_batch))
1669            }
1670            None => Ok(None),
1671        }
1672    }
1673}