mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::{BTreeMap, BTreeSet, VecDeque};
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use api::v1::SemanticType;
23use async_trait::async_trait;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, warn};
26use datafusion_expr::Expr;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use itertools::Itertools;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::storage::ColumnId;
39use table::predicate::Predicate;
40
41use crate::cache::CacheStrategy;
42use crate::error::{
43    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
44    ReadParquetSnafu, Result,
45};
46use crate::metrics::{
47    PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
48    READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
49};
50use crate::read::prune::{PruneReader, Source};
51use crate::read::{Batch, BatchReader};
52use crate::row_converter::build_primary_key_codec;
53use crate::sst::file::FileHandle;
54use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
55use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
56use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
57use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
58use crate::sst::parquet::format::ReadFormat;
59use crate::sst::parquet::metadata::MetadataLoader;
60use crate::sst::parquet::row_group::InMemoryRowGroup;
61use crate::sst::parquet::row_selection::{
62    intersect_row_selections, row_selection_from_row_ranges, row_selection_from_sorted_row_ids,
63};
64use crate::sst::parquet::stats::RowGroupPruningStats;
65use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
66
67/// Parquet SST reader builder.
68pub struct ParquetReaderBuilder {
69    /// SST directory.
70    file_dir: String,
71    file_handle: FileHandle,
72    object_store: ObjectStore,
73    /// Predicate to push down.
74    predicate: Option<Predicate>,
75    /// Metadata of columns to read.
76    ///
77    /// `None` reads all columns. Due to schema change, the projection
78    /// can contain columns not in the parquet file.
79    projection: Option<Vec<ColumnId>>,
80    /// Strategy to cache SST data.
81    cache_strategy: CacheStrategy,
82    /// Index appliers.
83    inverted_index_applier: Option<InvertedIndexApplierRef>,
84    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
85    fulltext_index_applier: Option<FulltextIndexApplierRef>,
86    /// Expected metadata of the region while reading the SST.
87    /// This is usually the latest metadata of the region. The reader use
88    /// it get the correct column id of a column by name.
89    expected_metadata: Option<RegionMetadataRef>,
90}
91
92impl ParquetReaderBuilder {
93    /// Returns a new [ParquetReaderBuilder] to read specific SST.
94    pub fn new(
95        file_dir: String,
96        file_handle: FileHandle,
97        object_store: ObjectStore,
98    ) -> ParquetReaderBuilder {
99        ParquetReaderBuilder {
100            file_dir,
101            file_handle,
102            object_store,
103            predicate: None,
104            projection: None,
105            cache_strategy: CacheStrategy::Disabled,
106            inverted_index_applier: None,
107            bloom_filter_index_applier: None,
108            fulltext_index_applier: None,
109            expected_metadata: None,
110        }
111    }
112
113    /// Attaches the predicate to the builder.
114    #[must_use]
115    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
116        self.predicate = predicate;
117        self
118    }
119
120    /// Attaches the projection to the builder.
121    ///
122    /// The reader only applies the projection to fields.
123    #[must_use]
124    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
125        self.projection = projection;
126        self
127    }
128
129    /// Attaches the cache to the builder.
130    #[must_use]
131    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
132        self.cache_strategy = cache;
133        self
134    }
135
136    /// Attaches the inverted index applier to the builder.
137    #[must_use]
138    pub(crate) fn inverted_index_applier(
139        mut self,
140        index_applier: Option<InvertedIndexApplierRef>,
141    ) -> Self {
142        self.inverted_index_applier = index_applier;
143        self
144    }
145
146    /// Attaches the bloom filter index applier to the builder.
147    #[must_use]
148    pub(crate) fn bloom_filter_index_applier(
149        mut self,
150        index_applier: Option<BloomFilterIndexApplierRef>,
151    ) -> Self {
152        self.bloom_filter_index_applier = index_applier;
153        self
154    }
155
156    /// Attaches the fulltext index applier to the builder.
157    #[must_use]
158    pub(crate) fn fulltext_index_applier(
159        mut self,
160        index_applier: Option<FulltextIndexApplierRef>,
161    ) -> Self {
162        self.fulltext_index_applier = index_applier;
163        self
164    }
165
166    /// Attaches the expected metadata to the builder.
167    #[must_use]
168    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
169        self.expected_metadata = expected_metadata;
170        self
171    }
172
173    /// Builds a [ParquetReader].
174    ///
175    /// This needs to perform IO operation.
176    pub async fn build(&self) -> Result<ParquetReader> {
177        let mut metrics = ReaderMetrics::default();
178
179        let (context, row_groups) = self.build_reader_input(&mut metrics).await?;
180        ParquetReader::new(Arc::new(context), row_groups).await
181    }
182
183    /// Builds a [FileRangeContext] and collects row groups to read.
184    ///
185    /// This needs to perform IO operation.
186    pub(crate) async fn build_reader_input(
187        &self,
188        metrics: &mut ReaderMetrics,
189    ) -> Result<(FileRangeContext, RowGroupMap)> {
190        let start = Instant::now();
191
192        let file_path = self.file_handle.file_path(&self.file_dir);
193        let file_size = self.file_handle.meta_ref().file_size;
194
195        // Loads parquet metadata of the file.
196        let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
197        // Decodes region metadata.
198        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
199        // Gets the metadata stored in the SST.
200        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
201        let read_format = if let Some(column_ids) = &self.projection {
202            ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
203        } else {
204            // Lists all column ids to read, we always use the expected metadata if possible.
205            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
206            ReadFormat::new(
207                region_meta.clone(),
208                expected_meta
209                    .column_metadatas
210                    .iter()
211                    .map(|col| col.column_id),
212            )
213        };
214
215        // Computes the projection mask.
216        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
217        let indices = read_format.projection_indices();
218        // Now we assumes we don't have nested schemas.
219        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
220        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
221
222        // Computes the field levels.
223        let hint = Some(read_format.arrow_schema().fields());
224        let field_levels =
225            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
226                .context(ReadDataPartSnafu)?;
227        let row_groups = self
228            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
229            .await;
230
231        let reader_builder = RowGroupReaderBuilder {
232            file_handle: self.file_handle.clone(),
233            file_path,
234            parquet_meta,
235            object_store: self.object_store.clone(),
236            projection: projection_mask,
237            field_levels,
238            cache_strategy: self.cache_strategy.clone(),
239        };
240
241        let filters = if let Some(predicate) = &self.predicate {
242            predicate
243                .exprs()
244                .iter()
245                .filter_map(|expr| {
246                    SimpleFilterContext::new_opt(
247                        &region_meta,
248                        self.expected_metadata.as_deref(),
249                        expr,
250                    )
251                })
252                .collect::<Vec<_>>()
253        } else {
254            vec![]
255        };
256
257        let codec = build_primary_key_codec(read_format.metadata());
258
259        let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
260
261        metrics.build_cost += start.elapsed();
262
263        Ok((context, row_groups))
264    }
265
266    /// Decodes region metadata from key value.
267    fn get_region_metadata(
268        file_path: &str,
269        key_value_meta: Option<&Vec<KeyValue>>,
270    ) -> Result<RegionMetadata> {
271        let key_values = key_value_meta.context(InvalidParquetSnafu {
272            file: file_path,
273            reason: "missing key value meta",
274        })?;
275        let meta_value = key_values
276            .iter()
277            .find(|kv| kv.key == PARQUET_METADATA_KEY)
278            .with_context(|| InvalidParquetSnafu {
279                file: file_path,
280                reason: format!("key {} not found", PARQUET_METADATA_KEY),
281            })?;
282        let json = meta_value
283            .value
284            .as_ref()
285            .with_context(|| InvalidParquetSnafu {
286                file: file_path,
287                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
288            })?;
289
290        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
291    }
292
293    /// Reads parquet metadata of specific file.
294    async fn read_parquet_metadata(
295        &self,
296        file_path: &str,
297        file_size: u64,
298    ) -> Result<Arc<ParquetMetaData>> {
299        let _t = READ_STAGE_ELAPSED
300            .with_label_values(&["read_parquet_metadata"])
301            .start_timer();
302
303        let region_id = self.file_handle.region_id();
304        let file_id = self.file_handle.file_id();
305        // Tries to get from global cache.
306        if let Some(metadata) = self
307            .cache_strategy
308            .get_parquet_meta_data(region_id, file_id)
309            .await
310        {
311            return Ok(metadata);
312        }
313
314        // Cache miss, load metadata directly.
315        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
316        let metadata = metadata_loader.load().await?;
317        let metadata = Arc::new(metadata);
318        // Cache the metadata.
319        self.cache_strategy.put_parquet_meta_data(
320            self.file_handle.region_id(),
321            self.file_handle.file_id(),
322            metadata.clone(),
323        );
324
325        Ok(metadata)
326    }
327
328    /// Computes row groups to read, along with their respective row selections.
329    async fn row_groups_to_read(
330        &self,
331        read_format: &ReadFormat,
332        parquet_meta: &ParquetMetaData,
333        metrics: &mut ReaderFilterMetrics,
334    ) -> BTreeMap<usize, Option<RowSelection>> {
335        let num_row_groups = parquet_meta.num_row_groups();
336        let num_rows = parquet_meta.file_metadata().num_rows();
337        if num_row_groups == 0 || num_rows == 0 {
338            return BTreeMap::default();
339        }
340
341        // Let's assume that the number of rows in the first row group
342        // can represent the `row_group_size` of the Parquet file.
343        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
344        if row_group_size == 0 {
345            return BTreeMap::default();
346        }
347
348        metrics.rg_total += num_row_groups;
349        metrics.rows_total += num_rows as usize;
350
351        let mut output = (0..num_row_groups).map(|i| (i, None)).collect();
352
353        self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
354            .await;
355        if output.is_empty() {
356            return output;
357        }
358
359        let inverted_filtered = self
360            .prune_row_groups_by_inverted_index(row_group_size, parquet_meta, &mut output, metrics)
361            .await;
362        if output.is_empty() {
363            return output;
364        }
365
366        if !inverted_filtered {
367            self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
368        }
369
370        self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics)
371            .await;
372
373        self.prune_row_groups_by_fulltext_bloom(parquet_meta, &mut output, metrics)
374            .await;
375
376        output
377    }
378
379    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
380    async fn prune_row_groups_by_fulltext_index(
381        &self,
382        row_group_size: usize,
383        parquet_meta: &ParquetMetaData,
384        output: &mut BTreeMap<usize, Option<RowSelection>>,
385        metrics: &mut ReaderFilterMetrics,
386    ) -> bool {
387        let Some(index_applier) = &self.fulltext_index_applier else {
388            return false;
389        };
390        if !self.file_handle.meta_ref().fulltext_index_available() {
391            return false;
392        }
393
394        let file_size_hint = self.file_handle.meta_ref().index_file_size();
395        let apply_res = match index_applier
396            .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
397            .await
398        {
399            Ok(Some(res)) => res,
400            Ok(None) => {
401                return false;
402            }
403            Err(err) => {
404                if cfg!(any(test, feature = "test")) {
405                    panic!(
406                        "Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}",
407                        self.file_handle.region_id(),
408                        self.file_handle.file_id(),
409                        err
410                    );
411                } else {
412                    warn!(
413                        err; "Failed to apply full-text index, region_id: {}, file_id: {}",
414                        self.file_handle.region_id(), self.file_handle.file_id()
415                    );
416                }
417
418                return false;
419            }
420        };
421
422        let row_group_to_row_ids =
423            Self::group_row_ids(apply_res, row_group_size, parquet_meta.num_row_groups());
424        Self::prune_row_groups_by_rows(
425            parquet_meta,
426            row_group_to_row_ids,
427            output,
428            &mut metrics.rg_fulltext_filtered,
429            &mut metrics.rows_fulltext_filtered,
430        );
431
432        true
433    }
434
435    /// Groups row IDs into row groups, with each group's row IDs starting from 0.
436    fn group_row_ids(
437        row_ids: BTreeSet<u32>,
438        row_group_size: usize,
439        num_row_groups: usize,
440    ) -> Vec<(usize, Vec<usize>)> {
441        let est_rows_per_group = row_ids.len() / num_row_groups;
442
443        let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
444        for row_id in row_ids {
445            let row_group_id = row_id as usize / row_group_size;
446            let row_id_in_group = row_id as usize % row_group_size;
447
448            if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
449                && *rg_id == row_group_id
450            {
451                row_ids.push(row_id_in_group);
452            } else {
453                let mut row_ids = Vec::with_capacity(est_rows_per_group);
454                row_ids.push(row_id_in_group);
455                row_group_to_row_ids.push((row_group_id, row_ids));
456            }
457        }
458
459        row_group_to_row_ids
460    }
461
462    /// Applies index to prune row groups.
463    ///
464    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
465    /// as an escape route in case of index issues, and it can be used to test
466    /// the correctness of the index.
467    async fn prune_row_groups_by_inverted_index(
468        &self,
469        row_group_size: usize,
470        parquet_meta: &ParquetMetaData,
471        output: &mut BTreeMap<usize, Option<RowSelection>>,
472        metrics: &mut ReaderFilterMetrics,
473    ) -> bool {
474        let Some(index_applier) = &self.inverted_index_applier else {
475            return false;
476        };
477
478        if !self.file_handle.meta_ref().inverted_index_available() {
479            return false;
480        }
481        let file_size_hint = self.file_handle.meta_ref().index_file_size();
482        let apply_output = match index_applier
483            .apply(self.file_handle.file_id(), Some(file_size_hint))
484            .await
485        {
486            Ok(output) => output,
487            Err(err) => {
488                if cfg!(any(test, feature = "test")) {
489                    panic!(
490                        "Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}",
491                        self.file_handle.region_id(),
492                        self.file_handle.file_id(),
493                        err
494                    );
495                } else {
496                    warn!(
497                        err; "Failed to apply inverted index, region_id: {}, file_id: {}",
498                        self.file_handle.region_id(), self.file_handle.file_id()
499                    );
500                }
501
502                return false;
503            }
504        };
505
506        let segment_row_count = apply_output.segment_row_count;
507        let grouped_in_row_groups = apply_output
508            .matched_segment_ids
509            .iter_ones()
510            .map(|seg_id| {
511                let begin_row_id = seg_id * segment_row_count;
512                let row_group_id = begin_row_id / row_group_size;
513
514                let rg_begin_row_id = begin_row_id % row_group_size;
515                let rg_end_row_id = rg_begin_row_id + segment_row_count;
516
517                (row_group_id, rg_begin_row_id..rg_end_row_id)
518            })
519            .chunk_by(|(row_group_id, _)| *row_group_id);
520
521        let ranges_in_row_groups = grouped_in_row_groups
522            .into_iter()
523            .map(|(row_group_id, group)| (row_group_id, group.map(|(_, ranges)| ranges)));
524
525        Self::prune_row_groups_by_ranges(
526            parquet_meta,
527            ranges_in_row_groups,
528            output,
529            &mut metrics.rg_inverted_filtered,
530            &mut metrics.rows_inverted_filtered,
531        );
532
533        true
534    }
535
536    /// Prunes row groups by min-max index.
537    fn prune_row_groups_by_minmax(
538        &self,
539        read_format: &ReadFormat,
540        parquet_meta: &ParquetMetaData,
541        output: &mut BTreeMap<usize, Option<RowSelection>>,
542        metrics: &mut ReaderFilterMetrics,
543    ) -> bool {
544        let Some(predicate) = &self.predicate else {
545            return false;
546        };
547
548        let row_groups_before = output.len();
549
550        let region_meta = read_format.metadata();
551        let row_groups = parquet_meta.row_groups();
552        let stats =
553            RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
554        let prune_schema = self
555            .expected_metadata
556            .as_ref()
557            .map(|meta| meta.schema.arrow_schema())
558            .unwrap_or_else(|| region_meta.schema.arrow_schema());
559
560        // Here we use the schema of the SST to build the physical expression. If the column
561        // in the SST doesn't have the same column id as the column in the expected metadata,
562        // we will get a None statistics for that column.
563        let res = predicate
564            .prune_with_stats(&stats, prune_schema)
565            .iter()
566            .zip(0..parquet_meta.num_row_groups())
567            .filter_map(|(mask, row_group)| {
568                if !*mask {
569                    return None;
570                }
571
572                let selection = output.remove(&row_group)?;
573                Some((row_group, selection))
574            })
575            .collect::<BTreeMap<_, _>>();
576
577        let row_groups_after = res.len();
578        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
579
580        *output = res;
581        true
582    }
583
584    async fn prune_row_groups_by_bloom_filter(
585        &self,
586        parquet_meta: &ParquetMetaData,
587        output: &mut BTreeMap<usize, Option<RowSelection>>,
588        metrics: &mut ReaderFilterMetrics,
589    ) -> bool {
590        let Some(index_applier) = &self.bloom_filter_index_applier else {
591            return false;
592        };
593
594        if !self.file_handle.meta_ref().bloom_filter_index_available() {
595            return false;
596        }
597
598        let file_size_hint = self.file_handle.meta_ref().index_file_size();
599        let apply_output = match index_applier
600            .apply(
601                self.file_handle.file_id(),
602                Some(file_size_hint),
603                parquet_meta
604                    .row_groups()
605                    .iter()
606                    .enumerate()
607                    .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
608            )
609            .await
610        {
611            Ok(apply_output) => apply_output,
612            Err(err) => {
613                if cfg!(any(test, feature = "test")) {
614                    panic!(
615                        "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
616                        self.file_handle.region_id(),
617                        self.file_handle.file_id(),
618                        err
619                    );
620                } else {
621                    warn!(
622                        err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
623                        self.file_handle.region_id(), self.file_handle.file_id()
624                    );
625                }
626
627                return false;
628            }
629        };
630
631        Self::prune_row_groups_by_ranges(
632            parquet_meta,
633            apply_output
634                .into_iter()
635                .map(|(rg, ranges)| (rg, ranges.into_iter())),
636            output,
637            &mut metrics.rg_bloom_filtered,
638            &mut metrics.rows_bloom_filtered,
639        );
640
641        true
642    }
643
644    async fn prune_row_groups_by_fulltext_bloom(
645        &self,
646        parquet_meta: &ParquetMetaData,
647        output: &mut BTreeMap<usize, Option<RowSelection>>,
648        metrics: &mut ReaderFilterMetrics,
649    ) -> bool {
650        let Some(index_applier) = &self.fulltext_index_applier else {
651            return false;
652        };
653
654        if !self.file_handle.meta_ref().fulltext_index_available() {
655            return false;
656        }
657
658        let file_size_hint = self.file_handle.meta_ref().index_file_size();
659        let apply_output = match index_applier
660            .apply_coarse(
661                self.file_handle.file_id(),
662                Some(file_size_hint),
663                parquet_meta
664                    .row_groups()
665                    .iter()
666                    .enumerate()
667                    .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
668            )
669            .await
670        {
671            Ok(Some(apply_output)) => apply_output,
672            Ok(None) => return false,
673            Err(err) => {
674                if cfg!(any(test, feature = "test")) {
675                    panic!(
676                        "Failed to apply fulltext index, region_id: {}, file_id: {}, err: {:?}",
677                        self.file_handle.region_id(),
678                        self.file_handle.file_id(),
679                        err
680                    );
681                } else {
682                    warn!(
683                        err; "Failed to apply fulltext index, region_id: {}, file_id: {}",
684                        self.file_handle.region_id(), self.file_handle.file_id()
685                    );
686                }
687
688                return false;
689            }
690        };
691
692        Self::prune_row_groups_by_ranges(
693            parquet_meta,
694            apply_output
695                .into_iter()
696                .map(|(rg, ranges)| (rg, ranges.into_iter())),
697            output,
698            &mut metrics.rg_fulltext_filtered,
699            &mut metrics.rows_fulltext_filtered,
700        );
701
702        true
703    }
704
705    /// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to
706    /// a list of row ids to keep.
707    fn prune_row_groups_by_rows(
708        parquet_meta: &ParquetMetaData,
709        rows_in_row_groups: Vec<(usize, Vec<usize>)>,
710        output: &mut BTreeMap<usize, Option<RowSelection>>,
711        filtered_row_groups: &mut usize,
712        filtered_rows: &mut usize,
713    ) {
714        let row_groups_before = output.len();
715        let mut rows_in_row_group_before = 0;
716        let mut rows_in_row_group_after = 0;
717
718        let mut res = BTreeMap::new();
719        for (row_group, row_ids) in rows_in_row_groups {
720            let Some(selection) = output.remove(&row_group) else {
721                continue;
722            };
723
724            let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
725            rows_in_row_group_before += selection
726                .as_ref()
727                .map_or(total_row_count, |s| s.row_count());
728
729            let new_selection =
730                row_selection_from_sorted_row_ids(row_ids.into_iter(), total_row_count);
731            let intersected_selection = intersect_row_selections(selection, Some(new_selection));
732
733            let num_rows_after = intersected_selection
734                .as_ref()
735                .map_or(total_row_count, |s| s.row_count());
736            rows_in_row_group_after += num_rows_after;
737
738            if num_rows_after > 0 {
739                res.insert(row_group, intersected_selection);
740            }
741        }
742
743        // Pruned row groups.
744        while let Some((row_group, selection)) = output.pop_first() {
745            let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
746            rows_in_row_group_before += selection
747                .as_ref()
748                .map_or(total_row_count, |s| s.row_count());
749        }
750
751        let row_groups_after = res.len();
752        *filtered_row_groups += row_groups_before - row_groups_after;
753        *filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
754
755        *output = res;
756    }
757
758    /// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to
759    /// a list of row ranges to keep.
760    fn prune_row_groups_by_ranges(
761        parquet_meta: &ParquetMetaData,
762        ranges_in_row_groups: impl Iterator<Item = (usize, impl Iterator<Item = Range<usize>>)>,
763        output: &mut BTreeMap<usize, Option<RowSelection>>,
764        filtered_row_groups: &mut usize,
765        filtered_rows: &mut usize,
766    ) {
767        let row_groups_before = output.len();
768        let mut rows_in_row_group_before = 0;
769        let mut rows_in_row_group_after = 0;
770
771        let mut res = BTreeMap::new();
772        for (row_group, row_ranges) in ranges_in_row_groups {
773            let Some(selection) = output.remove(&row_group) else {
774                continue;
775            };
776
777            let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
778            rows_in_row_group_before += selection
779                .as_ref()
780                .map_or(total_row_count, |s| s.row_count());
781
782            let new_selection = row_selection_from_row_ranges(row_ranges, total_row_count);
783            let intersected_selection = intersect_row_selections(selection, Some(new_selection));
784
785            let num_rows_after = intersected_selection
786                .as_ref()
787                .map_or(total_row_count, |s| s.row_count());
788            rows_in_row_group_after += num_rows_after;
789
790            if num_rows_after > 0 {
791                res.insert(row_group, intersected_selection);
792            }
793        }
794
795        // Pruned row groups.
796        while let Some((row_group, selection)) = output.pop_first() {
797            let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
798            rows_in_row_group_before += selection
799                .as_ref()
800                .map_or(total_row_count, |s| s.row_count());
801        }
802
803        let row_groups_after = res.len();
804        *filtered_row_groups += row_groups_before - row_groups_after;
805        *filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
806
807        *output = res;
808    }
809}
810
811/// Metrics of filtering rows groups and rows.
812#[derive(Debug, Default, Clone, Copy)]
813pub(crate) struct ReaderFilterMetrics {
814    /// Number of row groups before filtering.
815    pub(crate) rg_total: usize,
816    /// Number of row groups filtered by fulltext index.
817    pub(crate) rg_fulltext_filtered: usize,
818    /// Number of row groups filtered by inverted index.
819    pub(crate) rg_inverted_filtered: usize,
820    /// Number of row groups filtered by min-max index.
821    pub(crate) rg_minmax_filtered: usize,
822    /// Number of row groups filtered by bloom filter index.
823    pub(crate) rg_bloom_filtered: usize,
824
825    /// Number of rows in row group before filtering.
826    pub(crate) rows_total: usize,
827    /// Number of rows in row group filtered by fulltext index.
828    pub(crate) rows_fulltext_filtered: usize,
829    /// Number of rows in row group filtered by inverted index.
830    pub(crate) rows_inverted_filtered: usize,
831    /// Number of rows in row group filtered by bloom filter index.
832    pub(crate) rows_bloom_filtered: usize,
833    /// Number of rows filtered by precise filter.
834    pub(crate) rows_precise_filtered: usize,
835}
836
837impl ReaderFilterMetrics {
838    /// Adds `other` metrics to this metrics.
839    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
840        self.rg_total += other.rg_total;
841        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
842        self.rg_inverted_filtered += other.rg_inverted_filtered;
843        self.rg_minmax_filtered += other.rg_minmax_filtered;
844        self.rg_bloom_filtered += other.rg_bloom_filtered;
845
846        self.rows_total += other.rows_total;
847        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
848        self.rows_inverted_filtered += other.rows_inverted_filtered;
849        self.rows_bloom_filtered += other.rows_bloom_filtered;
850        self.rows_precise_filtered += other.rows_precise_filtered;
851    }
852
853    /// Reports metrics.
854    pub(crate) fn observe(&self) {
855        READ_ROW_GROUPS_TOTAL
856            .with_label_values(&["before_filtering"])
857            .inc_by(self.rg_total as u64);
858        READ_ROW_GROUPS_TOTAL
859            .with_label_values(&["fulltext_index_filtered"])
860            .inc_by(self.rg_fulltext_filtered as u64);
861        READ_ROW_GROUPS_TOTAL
862            .with_label_values(&["inverted_index_filtered"])
863            .inc_by(self.rg_inverted_filtered as u64);
864        READ_ROW_GROUPS_TOTAL
865            .with_label_values(&["minmax_index_filtered"])
866            .inc_by(self.rg_minmax_filtered as u64);
867        READ_ROW_GROUPS_TOTAL
868            .with_label_values(&["bloom_filter_index_filtered"])
869            .inc_by(self.rg_bloom_filtered as u64);
870
871        PRECISE_FILTER_ROWS_TOTAL
872            .with_label_values(&["parquet"])
873            .inc_by(self.rows_precise_filtered as u64);
874        READ_ROWS_IN_ROW_GROUP_TOTAL
875            .with_label_values(&["before_filtering"])
876            .inc_by(self.rows_total as u64);
877        READ_ROWS_IN_ROW_GROUP_TOTAL
878            .with_label_values(&["fulltext_index_filtered"])
879            .inc_by(self.rows_fulltext_filtered as u64);
880        READ_ROWS_IN_ROW_GROUP_TOTAL
881            .with_label_values(&["inverted_index_filtered"])
882            .inc_by(self.rows_inverted_filtered as u64);
883        READ_ROWS_IN_ROW_GROUP_TOTAL
884            .with_label_values(&["bloom_filter_index_filtered"])
885            .inc_by(self.rows_bloom_filtered as u64);
886    }
887}
888
889/// Parquet reader metrics.
890#[derive(Debug, Default, Clone)]
891pub(crate) struct ReaderMetrics {
892    /// Filtered row groups and rows metrics.
893    pub(crate) filter_metrics: ReaderFilterMetrics,
894    /// Duration to build the parquet reader.
895    pub(crate) build_cost: Duration,
896    /// Duration to scan the reader.
897    pub(crate) scan_cost: Duration,
898    /// Number of record batches read.
899    pub(crate) num_record_batches: usize,
900    /// Number of batches decoded.
901    pub(crate) num_batches: usize,
902    /// Number of rows read.
903    pub(crate) num_rows: usize,
904}
905
906impl ReaderMetrics {
907    /// Adds `other` metrics to this metrics.
908    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
909        self.filter_metrics.merge_from(&other.filter_metrics);
910        self.build_cost += other.build_cost;
911        self.scan_cost += other.scan_cost;
912        self.num_record_batches += other.num_record_batches;
913        self.num_batches += other.num_batches;
914        self.num_rows += other.num_rows;
915    }
916
917    /// Reports total rows.
918    pub(crate) fn observe_rows(&self, read_type: &str) {
919        READ_ROWS_TOTAL
920            .with_label_values(&[read_type])
921            .inc_by(self.num_rows as u64);
922    }
923}
924
925/// Builder to build a [ParquetRecordBatchReader] for a row group.
926pub(crate) struct RowGroupReaderBuilder {
927    /// SST file to read.
928    ///
929    /// Holds the file handle to avoid the file purge it.
930    file_handle: FileHandle,
931    /// Path of the file.
932    file_path: String,
933    /// Metadata of the parquet file.
934    parquet_meta: Arc<ParquetMetaData>,
935    /// Object store as an Operator.
936    object_store: ObjectStore,
937    /// Projection mask.
938    projection: ProjectionMask,
939    /// Field levels to read.
940    field_levels: FieldLevels,
941    /// Cache.
942    cache_strategy: CacheStrategy,
943}
944
945impl RowGroupReaderBuilder {
946    /// Path of the file to read.
947    pub(crate) fn file_path(&self) -> &str {
948        &self.file_path
949    }
950
951    /// Handle of the file to read.
952    pub(crate) fn file_handle(&self) -> &FileHandle {
953        &self.file_handle
954    }
955
956    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
957        &self.parquet_meta
958    }
959
960    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
961        &self.cache_strategy
962    }
963
964    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
965    pub(crate) async fn build(
966        &self,
967        row_group_idx: usize,
968        row_selection: Option<RowSelection>,
969    ) -> Result<ParquetRecordBatchReader> {
970        let mut row_group = InMemoryRowGroup::create(
971            self.file_handle.region_id(),
972            self.file_handle.file_id(),
973            &self.parquet_meta,
974            row_group_idx,
975            self.cache_strategy.clone(),
976            &self.file_path,
977            self.object_store.clone(),
978        );
979        // Fetches data into memory.
980        row_group
981            .fetch(&self.projection, row_selection.as_ref())
982            .await
983            .context(ReadParquetSnafu {
984                path: &self.file_path,
985            })?;
986
987        // Builds the parquet reader.
988        // Now the row selection is None.
989        ParquetRecordBatchReader::try_new_with_row_groups(
990            &self.field_levels,
991            &row_group,
992            DEFAULT_READ_BATCH_SIZE,
993            row_selection,
994        )
995        .context(ReadParquetSnafu {
996            path: &self.file_path,
997        })
998    }
999}
1000
1001/// The state of a [ParquetReader].
1002enum ReaderState {
1003    /// The reader is reading a row group.
1004    Readable(PruneReader),
1005    /// The reader is exhausted.
1006    Exhausted(ReaderMetrics),
1007}
1008
1009impl ReaderState {
1010    /// Returns the metrics of the reader.
1011    fn metrics(&self) -> ReaderMetrics {
1012        match self {
1013            ReaderState::Readable(reader) => reader.metrics(),
1014            ReaderState::Exhausted(m) => m.clone(),
1015        }
1016    }
1017}
1018
1019/// The filter to evaluate or the prune result of the default value.
1020pub(crate) enum MaybeFilter {
1021    /// The filter to evaluate.
1022    Filter(SimpleFilterEvaluator),
1023    /// The filter matches the default value.
1024    Matched,
1025    /// The filter is pruned.
1026    Pruned,
1027}
1028
1029/// Context to evaluate the column filter for a parquet file.
1030pub(crate) struct SimpleFilterContext {
1031    /// Filter to evaluate.
1032    filter: MaybeFilter,
1033    /// Id of the column to evaluate.
1034    column_id: ColumnId,
1035    /// Semantic type of the column.
1036    semantic_type: SemanticType,
1037    /// The data type of the column.
1038    data_type: ConcreteDataType,
1039}
1040
1041impl SimpleFilterContext {
1042    /// Creates a context for the `expr`.
1043    ///
1044    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1045    /// expected metadata.
1046    pub(crate) fn new_opt(
1047        sst_meta: &RegionMetadataRef,
1048        expected_meta: Option<&RegionMetadata>,
1049        expr: &Expr,
1050    ) -> Option<Self> {
1051        let filter = SimpleFilterEvaluator::try_new(expr)?;
1052        let (column_metadata, maybe_filter) = match expected_meta {
1053            Some(meta) => {
1054                // Gets the column metadata from the expected metadata.
1055                let column = meta.column_by_name(filter.column_name())?;
1056                // Checks if the column is present in the SST metadata. We still uses the
1057                // column from the expected metadata.
1058                match sst_meta.column_by_id(column.column_id) {
1059                    Some(sst_column) => {
1060                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1061
1062                        (column, MaybeFilter::Filter(filter))
1063                    }
1064                    None => {
1065                        // If the column is not present in the SST metadata, we evaluate the filter
1066                        // against the default value of the column.
1067                        // If we can't evaluate the filter, we return None.
1068                        if pruned_by_default(&filter, column)? {
1069                            (column, MaybeFilter::Pruned)
1070                        } else {
1071                            (column, MaybeFilter::Matched)
1072                        }
1073                    }
1074                }
1075            }
1076            None => {
1077                let column = sst_meta.column_by_name(filter.column_name())?;
1078                (column, MaybeFilter::Filter(filter))
1079            }
1080        };
1081
1082        Some(Self {
1083            filter: maybe_filter,
1084            column_id: column_metadata.column_id,
1085            semantic_type: column_metadata.semantic_type,
1086            data_type: column_metadata.column_schema.data_type.clone(),
1087        })
1088    }
1089
1090    /// Returns the filter to evaluate.
1091    pub(crate) fn filter(&self) -> &MaybeFilter {
1092        &self.filter
1093    }
1094
1095    /// Returns the column id.
1096    pub(crate) fn column_id(&self) -> ColumnId {
1097        self.column_id
1098    }
1099
1100    /// Returns the semantic type of the column.
1101    pub(crate) fn semantic_type(&self) -> SemanticType {
1102        self.semantic_type
1103    }
1104
1105    /// Returns the data type of the column.
1106    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1107        &self.data_type
1108    }
1109}
1110
1111/// Prune a column by its default value.
1112/// Returns false if we can't create the default value or evaluate the filter.
1113fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1114    let value = column.column_schema.create_default().ok().flatten()?;
1115    let scalar_value = value
1116        .try_to_scalar_value(&column.column_schema.data_type)
1117        .ok()?;
1118    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1119    Some(!matches)
1120}
1121
1122type RowGroupMap = BTreeMap<usize, Option<RowSelection>>;
1123
1124/// Parquet batch reader to read our SST format.
1125pub struct ParquetReader {
1126    /// File range context.
1127    context: FileRangeContextRef,
1128    /// Indices of row groups to read, along with their respective row selections.
1129    row_groups: RowGroupMap,
1130    /// Reader of current row group.
1131    reader_state: ReaderState,
1132}
1133
1134#[async_trait]
1135impl BatchReader for ParquetReader {
1136    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1137        let ReaderState::Readable(reader) = &mut self.reader_state else {
1138            return Ok(None);
1139        };
1140
1141        // We don't collect the elapsed time if the reader returns an error.
1142        if let Some(batch) = reader.next_batch().await? {
1143            return Ok(Some(batch));
1144        }
1145
1146        // No more items in current row group, reads next row group.
1147        while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() {
1148            let parquet_reader = self
1149                .context
1150                .reader_builder()
1151                .build(row_group_idx, row_selection)
1152                .await?;
1153
1154            // Resets the parquet reader.
1155            reader.reset_source(Source::RowGroup(RowGroupReader::new(
1156                self.context.clone(),
1157                parquet_reader,
1158            )));
1159            if let Some(batch) = reader.next_batch().await? {
1160                return Ok(Some(batch));
1161            }
1162        }
1163
1164        // The reader is exhausted.
1165        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1166        Ok(None)
1167    }
1168}
1169
1170impl Drop for ParquetReader {
1171    fn drop(&mut self) {
1172        let metrics = self.reader_state.metrics();
1173        debug!(
1174            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1175            self.context.reader_builder().file_handle.region_id(),
1176            self.context.reader_builder().file_handle.file_id(),
1177            self.context.reader_builder().file_handle.time_range(),
1178            metrics.filter_metrics.rg_total
1179                - metrics.filter_metrics.rg_inverted_filtered
1180                - metrics.filter_metrics.rg_minmax_filtered
1181                - metrics.filter_metrics.rg_fulltext_filtered
1182                - metrics.filter_metrics.rg_bloom_filtered,
1183            metrics.filter_metrics.rg_total,
1184            metrics
1185        );
1186
1187        // Report metrics.
1188        READ_STAGE_ELAPSED
1189            .with_label_values(&["build_parquet_reader"])
1190            .observe(metrics.build_cost.as_secs_f64());
1191        READ_STAGE_ELAPSED
1192            .with_label_values(&["scan_row_groups"])
1193            .observe(metrics.scan_cost.as_secs_f64());
1194        metrics.observe_rows("parquet_reader");
1195        metrics.filter_metrics.observe();
1196    }
1197}
1198
1199impl ParquetReader {
1200    /// Creates a new reader.
1201    async fn new(
1202        context: FileRangeContextRef,
1203        mut row_groups: BTreeMap<usize, Option<RowSelection>>,
1204    ) -> Result<Self> {
1205        // No more items in current row group, reads next row group.
1206        let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() {
1207            let parquet_reader = context
1208                .reader_builder()
1209                .build(row_group_idx, row_selection)
1210                .await?;
1211            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1212                context.clone(),
1213                RowGroupReader::new(context.clone(), parquet_reader),
1214            ))
1215        } else {
1216            ReaderState::Exhausted(ReaderMetrics::default())
1217        };
1218
1219        Ok(ParquetReader {
1220            context,
1221            row_groups,
1222            reader_state,
1223        })
1224    }
1225
1226    /// Returns the metadata of the SST.
1227    pub fn metadata(&self) -> &RegionMetadataRef {
1228        self.context.read_format().metadata()
1229    }
1230
1231    #[cfg(test)]
1232    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1233        self.context.reader_builder().parquet_meta.clone()
1234    }
1235}
1236
1237/// RowGroupReaderContext represents the fields that cannot be shared
1238/// between different `RowGroupReader`s.
1239pub(crate) trait RowGroupReaderContext: Send {
1240    fn map_result(
1241        &self,
1242        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1243    ) -> Result<Option<RecordBatch>>;
1244
1245    fn read_format(&self) -> &ReadFormat;
1246}
1247
1248impl RowGroupReaderContext for FileRangeContextRef {
1249    fn map_result(
1250        &self,
1251        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1252    ) -> Result<Option<RecordBatch>> {
1253        result.context(ArrowReaderSnafu {
1254            path: self.file_path(),
1255        })
1256    }
1257
1258    fn read_format(&self) -> &ReadFormat {
1259        self.as_ref().read_format()
1260    }
1261}
1262
1263/// [RowGroupReader] that reads from [FileRange].
1264pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1265
1266impl RowGroupReader {
1267    /// Creates a new reader from file range.
1268    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1269        Self {
1270            context,
1271            reader,
1272            batches: VecDeque::new(),
1273            metrics: ReaderMetrics::default(),
1274        }
1275    }
1276}
1277
1278/// Reader to read a row group of a parquet file.
1279pub(crate) struct RowGroupReaderBase<T> {
1280    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1281    context: T,
1282    /// Inner parquet reader.
1283    reader: ParquetRecordBatchReader,
1284    /// Buffered batches to return.
1285    batches: VecDeque<Batch>,
1286    /// Local scan metrics.
1287    metrics: ReaderMetrics,
1288}
1289
1290impl<T> RowGroupReaderBase<T>
1291where
1292    T: RowGroupReaderContext,
1293{
1294    /// Creates a new reader.
1295    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1296        Self {
1297            context,
1298            reader,
1299            batches: VecDeque::new(),
1300            metrics: ReaderMetrics::default(),
1301        }
1302    }
1303
1304    /// Gets the metrics.
1305    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1306        &self.metrics
1307    }
1308
1309    /// Gets [ReadFormat] of underlying reader.
1310    pub(crate) fn read_format(&self) -> &ReadFormat {
1311        self.context.read_format()
1312    }
1313
1314    /// Tries to fetch next [RecordBatch] from the reader.
1315    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1316        self.context.map_result(self.reader.next().transpose())
1317    }
1318
1319    /// Returns the next [Batch].
1320    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1321        let scan_start = Instant::now();
1322        if let Some(batch) = self.batches.pop_front() {
1323            self.metrics.num_rows += batch.num_rows();
1324            self.metrics.scan_cost += scan_start.elapsed();
1325            return Ok(Some(batch));
1326        }
1327
1328        // We need to fetch next record batch and convert it to batches.
1329        while self.batches.is_empty() {
1330            let Some(record_batch) = self.fetch_next_record_batch()? else {
1331                self.metrics.scan_cost += scan_start.elapsed();
1332                return Ok(None);
1333            };
1334            self.metrics.num_record_batches += 1;
1335
1336            self.context
1337                .read_format()
1338                .convert_record_batch(&record_batch, &mut self.batches)?;
1339            self.metrics.num_batches += self.batches.len();
1340        }
1341        let batch = self.batches.pop_front();
1342        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1343        self.metrics.scan_cost += scan_start.elapsed();
1344        Ok(batch)
1345    }
1346}
1347
1348#[async_trait::async_trait]
1349impl<T> BatchReader for RowGroupReaderBase<T>
1350where
1351    T: RowGroupReaderContext,
1352{
1353    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1354        self.next_inner()
1355    }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use parquet::arrow::arrow_reader::RowSelector;
1361    use parquet::file::metadata::{FileMetaData, RowGroupMetaData};
1362    use parquet::schema::types::{SchemaDescriptor, Type};
1363
1364    use super::*;
1365
1366    fn mock_parquet_metadata_from_row_groups(num_rows_in_row_groups: Vec<i64>) -> ParquetMetaData {
1367        let tp = Arc::new(Type::group_type_builder("test").build().unwrap());
1368        let schema_descr = Arc::new(SchemaDescriptor::new(tp));
1369
1370        let mut row_groups = Vec::new();
1371        for num_rows in &num_rows_in_row_groups {
1372            let row_group = RowGroupMetaData::builder(schema_descr.clone())
1373                .set_num_rows(*num_rows)
1374                .build()
1375                .unwrap();
1376            row_groups.push(row_group);
1377        }
1378
1379        let file_meta = FileMetaData::new(
1380            0,
1381            num_rows_in_row_groups.iter().sum(),
1382            None,
1383            None,
1384            schema_descr,
1385            None,
1386        );
1387        ParquetMetaData::new(file_meta, row_groups)
1388    }
1389
1390    #[test]
1391    fn test_group_row_ids() {
1392        let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
1393        let row_group_size = 5;
1394        let num_row_groups = 3;
1395
1396        let row_group_to_row_ids =
1397            ParquetReaderBuilder::group_row_ids(row_ids, row_group_size, num_row_groups);
1398
1399        assert_eq!(
1400            row_group_to_row_ids,
1401            vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
1402        );
1403    }
1404
1405    #[test]
1406    fn prune_row_groups_by_rows_from_empty() {
1407        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1408
1409        let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
1410
1411        // The original output is empty. No row groups are pruned.
1412        let mut output = BTreeMap::new();
1413        let mut filtered_row_groups = 0;
1414        let mut filtered_rows = 0;
1415
1416        ParquetReaderBuilder::prune_row_groups_by_rows(
1417            &parquet_meta,
1418            rows_in_row_groups,
1419            &mut output,
1420            &mut filtered_row_groups,
1421            &mut filtered_rows,
1422        );
1423
1424        assert!(output.is_empty());
1425        assert_eq!(filtered_row_groups, 0);
1426        assert_eq!(filtered_rows, 0);
1427    }
1428
1429    #[test]
1430    fn prune_row_groups_by_rows_from_full() {
1431        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1432
1433        let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
1434
1435        // The original output is full.
1436        let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
1437        let mut filtered_row_groups = 0;
1438        let mut filtered_rows = 0;
1439
1440        ParquetReaderBuilder::prune_row_groups_by_rows(
1441            &parquet_meta,
1442            rows_in_row_groups,
1443            &mut output,
1444            &mut filtered_row_groups,
1445            &mut filtered_rows,
1446        );
1447
1448        assert_eq!(
1449            output,
1450            BTreeMap::from([
1451                (
1452                    0,
1453                    Some(RowSelection::from(vec![
1454                        RowSelector::skip(5),
1455                        RowSelector::select(5),
1456                    ]))
1457                ),
1458                (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1459            ])
1460        );
1461        assert_eq!(filtered_row_groups, 1);
1462        assert_eq!(filtered_rows, 15);
1463    }
1464
1465    #[test]
1466    fn prune_row_groups_by_rows_from_not_full() {
1467        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1468
1469        let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
1470
1471        // The original output is not full.
1472        let mut output = BTreeMap::from([
1473            (
1474                0,
1475                Some(RowSelection::from(vec![
1476                    RowSelector::select(5),
1477                    RowSelector::skip(5),
1478                ])),
1479            ),
1480            (
1481                1,
1482                Some(RowSelection::from(vec![
1483                    RowSelector::select(5),
1484                    RowSelector::skip(5),
1485                ])),
1486            ),
1487            (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1488        ]);
1489        let mut filtered_row_groups = 0;
1490        let mut filtered_rows = 0;
1491
1492        ParquetReaderBuilder::prune_row_groups_by_rows(
1493            &parquet_meta,
1494            rows_in_row_groups,
1495            &mut output,
1496            &mut filtered_row_groups,
1497            &mut filtered_rows,
1498        );
1499
1500        assert_eq!(
1501            output,
1502            BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
1503        );
1504        assert_eq!(filtered_row_groups, 2);
1505        assert_eq!(filtered_rows, 10);
1506    }
1507
1508    #[test]
1509    fn prune_row_groups_by_ranges_from_empty() {
1510        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1511
1512        let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
1513
1514        // The original output is empty. No row groups are pruned.
1515        let mut output = BTreeMap::new();
1516        let mut filtered_row_groups = 0;
1517        let mut filtered_rows = 0;
1518
1519        ParquetReaderBuilder::prune_row_groups_by_ranges(
1520            &parquet_meta,
1521            ranges_in_row_groups.into_iter(),
1522            &mut output,
1523            &mut filtered_row_groups,
1524            &mut filtered_rows,
1525        );
1526
1527        assert!(output.is_empty());
1528        assert_eq!(filtered_row_groups, 0);
1529        assert_eq!(filtered_rows, 0);
1530    }
1531
1532    #[test]
1533    fn prune_row_groups_by_ranges_from_full() {
1534        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1535
1536        let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
1537
1538        // The original output is full.
1539        let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
1540        let mut filtered_row_groups = 0;
1541        let mut filtered_rows = 0;
1542
1543        ParquetReaderBuilder::prune_row_groups_by_ranges(
1544            &parquet_meta,
1545            ranges_in_row_groups.into_iter(),
1546            &mut output,
1547            &mut filtered_row_groups,
1548            &mut filtered_rows,
1549        );
1550
1551        assert_eq!(
1552            output,
1553            BTreeMap::from([
1554                (
1555                    0,
1556                    Some(RowSelection::from(vec![
1557                        RowSelector::skip(5),
1558                        RowSelector::select(5),
1559                    ]))
1560                ),
1561                (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1562            ])
1563        );
1564        assert_eq!(filtered_row_groups, 1);
1565        assert_eq!(filtered_rows, 15);
1566    }
1567
1568    #[test]
1569    fn prune_row_groups_by_ranges_from_not_full() {
1570        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1571
1572        let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
1573
1574        // The original output is not full.
1575        let mut output = BTreeMap::from([
1576            (
1577                0,
1578                Some(RowSelection::from(vec![
1579                    RowSelector::select(5),
1580                    RowSelector::skip(5),
1581                ])),
1582            ),
1583            (
1584                1,
1585                Some(RowSelection::from(vec![
1586                    RowSelector::select(5),
1587                    RowSelector::skip(5),
1588                ])),
1589            ),
1590            (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1591        ]);
1592        let mut filtered_row_groups = 0;
1593        let mut filtered_rows = 0;
1594
1595        ParquetReaderBuilder::prune_row_groups_by_ranges(
1596            &parquet_meta,
1597            ranges_in_row_groups.into_iter(),
1598            &mut output,
1599            &mut filtered_row_groups,
1600            &mut filtered_rows,
1601        );
1602
1603        assert_eq!(
1604            output,
1605            BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
1606        );
1607        assert_eq!(filtered_row_groups, 2);
1608        assert_eq!(filtered_rows, 10);
1609    }
1610
1611    #[test]
1612    fn prune_row_groups_by_ranges_exceed_end() {
1613        let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1614
1615        // The range exceeds the end of the row group.
1616        let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..10).into_iter())];
1617
1618        let mut output = BTreeMap::from([
1619            (
1620                0,
1621                Some(RowSelection::from(vec![
1622                    RowSelector::select(5),
1623                    RowSelector::skip(5),
1624                ])),
1625            ),
1626            (
1627                1,
1628                Some(RowSelection::from(vec![
1629                    RowSelector::select(5),
1630                    RowSelector::skip(5),
1631                ])),
1632            ),
1633            (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1634        ]);
1635        let mut filtered_row_groups = 0;
1636        let mut filtered_rows = 0;
1637
1638        ParquetReaderBuilder::prune_row_groups_by_ranges(
1639            &parquet_meta,
1640            ranges_in_row_groups.into_iter(),
1641            &mut output,
1642            &mut filtered_row_groups,
1643            &mut filtered_rows,
1644        );
1645
1646        assert_eq!(
1647            output,
1648            BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
1649        );
1650        assert_eq!(filtered_row_groups, 2);
1651        assert_eq!(filtered_rows, 10);
1652    }
1653}