1use std::collections::{BTreeMap, BTreeSet, VecDeque};
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use api::v1::SemanticType;
23use async_trait::async_trait;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{debug, warn};
26use datafusion_expr::Expr;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use itertools::Itertools;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::storage::ColumnId;
39use table::predicate::Predicate;
40
41use crate::cache::CacheStrategy;
42use crate::error::{
43 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
44 ReadParquetSnafu, Result,
45};
46use crate::metrics::{
47 PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
48 READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
49};
50use crate::read::prune::{PruneReader, Source};
51use crate::read::{Batch, BatchReader};
52use crate::row_converter::build_primary_key_codec;
53use crate::sst::file::FileHandle;
54use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
55use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
56use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
57use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
58use crate::sst::parquet::format::ReadFormat;
59use crate::sst::parquet::metadata::MetadataLoader;
60use crate::sst::parquet::row_group::InMemoryRowGroup;
61use crate::sst::parquet::row_selection::{
62 intersect_row_selections, row_selection_from_row_ranges, row_selection_from_sorted_row_ids,
63};
64use crate::sst::parquet::stats::RowGroupPruningStats;
65use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
66
67pub struct ParquetReaderBuilder {
69 file_dir: String,
71 file_handle: FileHandle,
72 object_store: ObjectStore,
73 predicate: Option<Predicate>,
75 projection: Option<Vec<ColumnId>>,
80 cache_strategy: CacheStrategy,
82 inverted_index_applier: Option<InvertedIndexApplierRef>,
84 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
85 fulltext_index_applier: Option<FulltextIndexApplierRef>,
86 expected_metadata: Option<RegionMetadataRef>,
90}
91
92impl ParquetReaderBuilder {
93 pub fn new(
95 file_dir: String,
96 file_handle: FileHandle,
97 object_store: ObjectStore,
98 ) -> ParquetReaderBuilder {
99 ParquetReaderBuilder {
100 file_dir,
101 file_handle,
102 object_store,
103 predicate: None,
104 projection: None,
105 cache_strategy: CacheStrategy::Disabled,
106 inverted_index_applier: None,
107 bloom_filter_index_applier: None,
108 fulltext_index_applier: None,
109 expected_metadata: None,
110 }
111 }
112
113 #[must_use]
115 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
116 self.predicate = predicate;
117 self
118 }
119
120 #[must_use]
124 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
125 self.projection = projection;
126 self
127 }
128
129 #[must_use]
131 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
132 self.cache_strategy = cache;
133 self
134 }
135
136 #[must_use]
138 pub(crate) fn inverted_index_applier(
139 mut self,
140 index_applier: Option<InvertedIndexApplierRef>,
141 ) -> Self {
142 self.inverted_index_applier = index_applier;
143 self
144 }
145
146 #[must_use]
148 pub(crate) fn bloom_filter_index_applier(
149 mut self,
150 index_applier: Option<BloomFilterIndexApplierRef>,
151 ) -> Self {
152 self.bloom_filter_index_applier = index_applier;
153 self
154 }
155
156 #[must_use]
158 pub(crate) fn fulltext_index_applier(
159 mut self,
160 index_applier: Option<FulltextIndexApplierRef>,
161 ) -> Self {
162 self.fulltext_index_applier = index_applier;
163 self
164 }
165
166 #[must_use]
168 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
169 self.expected_metadata = expected_metadata;
170 self
171 }
172
173 pub async fn build(&self) -> Result<ParquetReader> {
177 let mut metrics = ReaderMetrics::default();
178
179 let (context, row_groups) = self.build_reader_input(&mut metrics).await?;
180 ParquetReader::new(Arc::new(context), row_groups).await
181 }
182
183 pub(crate) async fn build_reader_input(
187 &self,
188 metrics: &mut ReaderMetrics,
189 ) -> Result<(FileRangeContext, RowGroupMap)> {
190 let start = Instant::now();
191
192 let file_path = self.file_handle.file_path(&self.file_dir);
193 let file_size = self.file_handle.meta_ref().file_size;
194
195 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
197 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
199 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
201 let read_format = if let Some(column_ids) = &self.projection {
202 ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
203 } else {
204 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
206 ReadFormat::new(
207 region_meta.clone(),
208 expected_meta
209 .column_metadatas
210 .iter()
211 .map(|col| col.column_id),
212 )
213 };
214
215 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
217 let indices = read_format.projection_indices();
218 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
221
222 let hint = Some(read_format.arrow_schema().fields());
224 let field_levels =
225 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
226 .context(ReadDataPartSnafu)?;
227 let row_groups = self
228 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
229 .await;
230
231 let reader_builder = RowGroupReaderBuilder {
232 file_handle: self.file_handle.clone(),
233 file_path,
234 parquet_meta,
235 object_store: self.object_store.clone(),
236 projection: projection_mask,
237 field_levels,
238 cache_strategy: self.cache_strategy.clone(),
239 };
240
241 let filters = if let Some(predicate) = &self.predicate {
242 predicate
243 .exprs()
244 .iter()
245 .filter_map(|expr| {
246 SimpleFilterContext::new_opt(
247 ®ion_meta,
248 self.expected_metadata.as_deref(),
249 expr,
250 )
251 })
252 .collect::<Vec<_>>()
253 } else {
254 vec![]
255 };
256
257 let codec = build_primary_key_codec(read_format.metadata());
258
259 let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
260
261 metrics.build_cost += start.elapsed();
262
263 Ok((context, row_groups))
264 }
265
266 fn get_region_metadata(
268 file_path: &str,
269 key_value_meta: Option<&Vec<KeyValue>>,
270 ) -> Result<RegionMetadata> {
271 let key_values = key_value_meta.context(InvalidParquetSnafu {
272 file: file_path,
273 reason: "missing key value meta",
274 })?;
275 let meta_value = key_values
276 .iter()
277 .find(|kv| kv.key == PARQUET_METADATA_KEY)
278 .with_context(|| InvalidParquetSnafu {
279 file: file_path,
280 reason: format!("key {} not found", PARQUET_METADATA_KEY),
281 })?;
282 let json = meta_value
283 .value
284 .as_ref()
285 .with_context(|| InvalidParquetSnafu {
286 file: file_path,
287 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
288 })?;
289
290 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
291 }
292
293 async fn read_parquet_metadata(
295 &self,
296 file_path: &str,
297 file_size: u64,
298 ) -> Result<Arc<ParquetMetaData>> {
299 let _t = READ_STAGE_ELAPSED
300 .with_label_values(&["read_parquet_metadata"])
301 .start_timer();
302
303 let region_id = self.file_handle.region_id();
304 let file_id = self.file_handle.file_id();
305 if let Some(metadata) = self
307 .cache_strategy
308 .get_parquet_meta_data(region_id, file_id)
309 .await
310 {
311 return Ok(metadata);
312 }
313
314 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
316 let metadata = metadata_loader.load().await?;
317 let metadata = Arc::new(metadata);
318 self.cache_strategy.put_parquet_meta_data(
320 self.file_handle.region_id(),
321 self.file_handle.file_id(),
322 metadata.clone(),
323 );
324
325 Ok(metadata)
326 }
327
328 async fn row_groups_to_read(
330 &self,
331 read_format: &ReadFormat,
332 parquet_meta: &ParquetMetaData,
333 metrics: &mut ReaderFilterMetrics,
334 ) -> BTreeMap<usize, Option<RowSelection>> {
335 let num_row_groups = parquet_meta.num_row_groups();
336 let num_rows = parquet_meta.file_metadata().num_rows();
337 if num_row_groups == 0 || num_rows == 0 {
338 return BTreeMap::default();
339 }
340
341 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
344 if row_group_size == 0 {
345 return BTreeMap::default();
346 }
347
348 metrics.rg_total += num_row_groups;
349 metrics.rows_total += num_rows as usize;
350
351 let mut output = (0..num_row_groups).map(|i| (i, None)).collect();
352
353 self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
354 .await;
355 if output.is_empty() {
356 return output;
357 }
358
359 let inverted_filtered = self
360 .prune_row_groups_by_inverted_index(row_group_size, parquet_meta, &mut output, metrics)
361 .await;
362 if output.is_empty() {
363 return output;
364 }
365
366 if !inverted_filtered {
367 self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
368 }
369
370 self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics)
371 .await;
372
373 self.prune_row_groups_by_fulltext_bloom(parquet_meta, &mut output, metrics)
374 .await;
375
376 output
377 }
378
379 async fn prune_row_groups_by_fulltext_index(
381 &self,
382 row_group_size: usize,
383 parquet_meta: &ParquetMetaData,
384 output: &mut BTreeMap<usize, Option<RowSelection>>,
385 metrics: &mut ReaderFilterMetrics,
386 ) -> bool {
387 let Some(index_applier) = &self.fulltext_index_applier else {
388 return false;
389 };
390 if !self.file_handle.meta_ref().fulltext_index_available() {
391 return false;
392 }
393
394 let file_size_hint = self.file_handle.meta_ref().index_file_size();
395 let apply_res = match index_applier
396 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
397 .await
398 {
399 Ok(Some(res)) => res,
400 Ok(None) => {
401 return false;
402 }
403 Err(err) => {
404 if cfg!(any(test, feature = "test")) {
405 panic!(
406 "Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}",
407 self.file_handle.region_id(),
408 self.file_handle.file_id(),
409 err
410 );
411 } else {
412 warn!(
413 err; "Failed to apply full-text index, region_id: {}, file_id: {}",
414 self.file_handle.region_id(), self.file_handle.file_id()
415 );
416 }
417
418 return false;
419 }
420 };
421
422 let row_group_to_row_ids =
423 Self::group_row_ids(apply_res, row_group_size, parquet_meta.num_row_groups());
424 Self::prune_row_groups_by_rows(
425 parquet_meta,
426 row_group_to_row_ids,
427 output,
428 &mut metrics.rg_fulltext_filtered,
429 &mut metrics.rows_fulltext_filtered,
430 );
431
432 true
433 }
434
435 fn group_row_ids(
437 row_ids: BTreeSet<u32>,
438 row_group_size: usize,
439 num_row_groups: usize,
440 ) -> Vec<(usize, Vec<usize>)> {
441 let est_rows_per_group = row_ids.len() / num_row_groups;
442
443 let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
444 for row_id in row_ids {
445 let row_group_id = row_id as usize / row_group_size;
446 let row_id_in_group = row_id as usize % row_group_size;
447
448 if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
449 && *rg_id == row_group_id
450 {
451 row_ids.push(row_id_in_group);
452 } else {
453 let mut row_ids = Vec::with_capacity(est_rows_per_group);
454 row_ids.push(row_id_in_group);
455 row_group_to_row_ids.push((row_group_id, row_ids));
456 }
457 }
458
459 row_group_to_row_ids
460 }
461
462 async fn prune_row_groups_by_inverted_index(
468 &self,
469 row_group_size: usize,
470 parquet_meta: &ParquetMetaData,
471 output: &mut BTreeMap<usize, Option<RowSelection>>,
472 metrics: &mut ReaderFilterMetrics,
473 ) -> bool {
474 let Some(index_applier) = &self.inverted_index_applier else {
475 return false;
476 };
477
478 if !self.file_handle.meta_ref().inverted_index_available() {
479 return false;
480 }
481 let file_size_hint = self.file_handle.meta_ref().index_file_size();
482 let apply_output = match index_applier
483 .apply(self.file_handle.file_id(), Some(file_size_hint))
484 .await
485 {
486 Ok(output) => output,
487 Err(err) => {
488 if cfg!(any(test, feature = "test")) {
489 panic!(
490 "Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}",
491 self.file_handle.region_id(),
492 self.file_handle.file_id(),
493 err
494 );
495 } else {
496 warn!(
497 err; "Failed to apply inverted index, region_id: {}, file_id: {}",
498 self.file_handle.region_id(), self.file_handle.file_id()
499 );
500 }
501
502 return false;
503 }
504 };
505
506 let segment_row_count = apply_output.segment_row_count;
507 let grouped_in_row_groups = apply_output
508 .matched_segment_ids
509 .iter_ones()
510 .map(|seg_id| {
511 let begin_row_id = seg_id * segment_row_count;
512 let row_group_id = begin_row_id / row_group_size;
513
514 let rg_begin_row_id = begin_row_id % row_group_size;
515 let rg_end_row_id = rg_begin_row_id + segment_row_count;
516
517 (row_group_id, rg_begin_row_id..rg_end_row_id)
518 })
519 .chunk_by(|(row_group_id, _)| *row_group_id);
520
521 let ranges_in_row_groups = grouped_in_row_groups
522 .into_iter()
523 .map(|(row_group_id, group)| (row_group_id, group.map(|(_, ranges)| ranges)));
524
525 Self::prune_row_groups_by_ranges(
526 parquet_meta,
527 ranges_in_row_groups,
528 output,
529 &mut metrics.rg_inverted_filtered,
530 &mut metrics.rows_inverted_filtered,
531 );
532
533 true
534 }
535
536 fn prune_row_groups_by_minmax(
538 &self,
539 read_format: &ReadFormat,
540 parquet_meta: &ParquetMetaData,
541 output: &mut BTreeMap<usize, Option<RowSelection>>,
542 metrics: &mut ReaderFilterMetrics,
543 ) -> bool {
544 let Some(predicate) = &self.predicate else {
545 return false;
546 };
547
548 let row_groups_before = output.len();
549
550 let region_meta = read_format.metadata();
551 let row_groups = parquet_meta.row_groups();
552 let stats =
553 RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
554 let prune_schema = self
555 .expected_metadata
556 .as_ref()
557 .map(|meta| meta.schema.arrow_schema())
558 .unwrap_or_else(|| region_meta.schema.arrow_schema());
559
560 let res = predicate
564 .prune_with_stats(&stats, prune_schema)
565 .iter()
566 .zip(0..parquet_meta.num_row_groups())
567 .filter_map(|(mask, row_group)| {
568 if !*mask {
569 return None;
570 }
571
572 let selection = output.remove(&row_group)?;
573 Some((row_group, selection))
574 })
575 .collect::<BTreeMap<_, _>>();
576
577 let row_groups_after = res.len();
578 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
579
580 *output = res;
581 true
582 }
583
584 async fn prune_row_groups_by_bloom_filter(
585 &self,
586 parquet_meta: &ParquetMetaData,
587 output: &mut BTreeMap<usize, Option<RowSelection>>,
588 metrics: &mut ReaderFilterMetrics,
589 ) -> bool {
590 let Some(index_applier) = &self.bloom_filter_index_applier else {
591 return false;
592 };
593
594 if !self.file_handle.meta_ref().bloom_filter_index_available() {
595 return false;
596 }
597
598 let file_size_hint = self.file_handle.meta_ref().index_file_size();
599 let apply_output = match index_applier
600 .apply(
601 self.file_handle.file_id(),
602 Some(file_size_hint),
603 parquet_meta
604 .row_groups()
605 .iter()
606 .enumerate()
607 .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
608 )
609 .await
610 {
611 Ok(apply_output) => apply_output,
612 Err(err) => {
613 if cfg!(any(test, feature = "test")) {
614 panic!(
615 "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
616 self.file_handle.region_id(),
617 self.file_handle.file_id(),
618 err
619 );
620 } else {
621 warn!(
622 err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
623 self.file_handle.region_id(), self.file_handle.file_id()
624 );
625 }
626
627 return false;
628 }
629 };
630
631 Self::prune_row_groups_by_ranges(
632 parquet_meta,
633 apply_output
634 .into_iter()
635 .map(|(rg, ranges)| (rg, ranges.into_iter())),
636 output,
637 &mut metrics.rg_bloom_filtered,
638 &mut metrics.rows_bloom_filtered,
639 );
640
641 true
642 }
643
644 async fn prune_row_groups_by_fulltext_bloom(
645 &self,
646 parquet_meta: &ParquetMetaData,
647 output: &mut BTreeMap<usize, Option<RowSelection>>,
648 metrics: &mut ReaderFilterMetrics,
649 ) -> bool {
650 let Some(index_applier) = &self.fulltext_index_applier else {
651 return false;
652 };
653
654 if !self.file_handle.meta_ref().fulltext_index_available() {
655 return false;
656 }
657
658 let file_size_hint = self.file_handle.meta_ref().index_file_size();
659 let apply_output = match index_applier
660 .apply_coarse(
661 self.file_handle.file_id(),
662 Some(file_size_hint),
663 parquet_meta
664 .row_groups()
665 .iter()
666 .enumerate()
667 .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
668 )
669 .await
670 {
671 Ok(Some(apply_output)) => apply_output,
672 Ok(None) => return false,
673 Err(err) => {
674 if cfg!(any(test, feature = "test")) {
675 panic!(
676 "Failed to apply fulltext index, region_id: {}, file_id: {}, err: {:?}",
677 self.file_handle.region_id(),
678 self.file_handle.file_id(),
679 err
680 );
681 } else {
682 warn!(
683 err; "Failed to apply fulltext index, region_id: {}, file_id: {}",
684 self.file_handle.region_id(), self.file_handle.file_id()
685 );
686 }
687
688 return false;
689 }
690 };
691
692 Self::prune_row_groups_by_ranges(
693 parquet_meta,
694 apply_output
695 .into_iter()
696 .map(|(rg, ranges)| (rg, ranges.into_iter())),
697 output,
698 &mut metrics.rg_fulltext_filtered,
699 &mut metrics.rows_fulltext_filtered,
700 );
701
702 true
703 }
704
705 fn prune_row_groups_by_rows(
708 parquet_meta: &ParquetMetaData,
709 rows_in_row_groups: Vec<(usize, Vec<usize>)>,
710 output: &mut BTreeMap<usize, Option<RowSelection>>,
711 filtered_row_groups: &mut usize,
712 filtered_rows: &mut usize,
713 ) {
714 let row_groups_before = output.len();
715 let mut rows_in_row_group_before = 0;
716 let mut rows_in_row_group_after = 0;
717
718 let mut res = BTreeMap::new();
719 for (row_group, row_ids) in rows_in_row_groups {
720 let Some(selection) = output.remove(&row_group) else {
721 continue;
722 };
723
724 let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
725 rows_in_row_group_before += selection
726 .as_ref()
727 .map_or(total_row_count, |s| s.row_count());
728
729 let new_selection =
730 row_selection_from_sorted_row_ids(row_ids.into_iter(), total_row_count);
731 let intersected_selection = intersect_row_selections(selection, Some(new_selection));
732
733 let num_rows_after = intersected_selection
734 .as_ref()
735 .map_or(total_row_count, |s| s.row_count());
736 rows_in_row_group_after += num_rows_after;
737
738 if num_rows_after > 0 {
739 res.insert(row_group, intersected_selection);
740 }
741 }
742
743 while let Some((row_group, selection)) = output.pop_first() {
745 let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
746 rows_in_row_group_before += selection
747 .as_ref()
748 .map_or(total_row_count, |s| s.row_count());
749 }
750
751 let row_groups_after = res.len();
752 *filtered_row_groups += row_groups_before - row_groups_after;
753 *filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
754
755 *output = res;
756 }
757
758 fn prune_row_groups_by_ranges(
761 parquet_meta: &ParquetMetaData,
762 ranges_in_row_groups: impl Iterator<Item = (usize, impl Iterator<Item = Range<usize>>)>,
763 output: &mut BTreeMap<usize, Option<RowSelection>>,
764 filtered_row_groups: &mut usize,
765 filtered_rows: &mut usize,
766 ) {
767 let row_groups_before = output.len();
768 let mut rows_in_row_group_before = 0;
769 let mut rows_in_row_group_after = 0;
770
771 let mut res = BTreeMap::new();
772 for (row_group, row_ranges) in ranges_in_row_groups {
773 let Some(selection) = output.remove(&row_group) else {
774 continue;
775 };
776
777 let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
778 rows_in_row_group_before += selection
779 .as_ref()
780 .map_or(total_row_count, |s| s.row_count());
781
782 let new_selection = row_selection_from_row_ranges(row_ranges, total_row_count);
783 let intersected_selection = intersect_row_selections(selection, Some(new_selection));
784
785 let num_rows_after = intersected_selection
786 .as_ref()
787 .map_or(total_row_count, |s| s.row_count());
788 rows_in_row_group_after += num_rows_after;
789
790 if num_rows_after > 0 {
791 res.insert(row_group, intersected_selection);
792 }
793 }
794
795 while let Some((row_group, selection)) = output.pop_first() {
797 let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
798 rows_in_row_group_before += selection
799 .as_ref()
800 .map_or(total_row_count, |s| s.row_count());
801 }
802
803 let row_groups_after = res.len();
804 *filtered_row_groups += row_groups_before - row_groups_after;
805 *filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
806
807 *output = res;
808 }
809}
810
811#[derive(Debug, Default, Clone, Copy)]
813pub(crate) struct ReaderFilterMetrics {
814 pub(crate) rg_total: usize,
816 pub(crate) rg_fulltext_filtered: usize,
818 pub(crate) rg_inverted_filtered: usize,
820 pub(crate) rg_minmax_filtered: usize,
822 pub(crate) rg_bloom_filtered: usize,
824
825 pub(crate) rows_total: usize,
827 pub(crate) rows_fulltext_filtered: usize,
829 pub(crate) rows_inverted_filtered: usize,
831 pub(crate) rows_bloom_filtered: usize,
833 pub(crate) rows_precise_filtered: usize,
835}
836
837impl ReaderFilterMetrics {
838 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
840 self.rg_total += other.rg_total;
841 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
842 self.rg_inverted_filtered += other.rg_inverted_filtered;
843 self.rg_minmax_filtered += other.rg_minmax_filtered;
844 self.rg_bloom_filtered += other.rg_bloom_filtered;
845
846 self.rows_total += other.rows_total;
847 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
848 self.rows_inverted_filtered += other.rows_inverted_filtered;
849 self.rows_bloom_filtered += other.rows_bloom_filtered;
850 self.rows_precise_filtered += other.rows_precise_filtered;
851 }
852
853 pub(crate) fn observe(&self) {
855 READ_ROW_GROUPS_TOTAL
856 .with_label_values(&["before_filtering"])
857 .inc_by(self.rg_total as u64);
858 READ_ROW_GROUPS_TOTAL
859 .with_label_values(&["fulltext_index_filtered"])
860 .inc_by(self.rg_fulltext_filtered as u64);
861 READ_ROW_GROUPS_TOTAL
862 .with_label_values(&["inverted_index_filtered"])
863 .inc_by(self.rg_inverted_filtered as u64);
864 READ_ROW_GROUPS_TOTAL
865 .with_label_values(&["minmax_index_filtered"])
866 .inc_by(self.rg_minmax_filtered as u64);
867 READ_ROW_GROUPS_TOTAL
868 .with_label_values(&["bloom_filter_index_filtered"])
869 .inc_by(self.rg_bloom_filtered as u64);
870
871 PRECISE_FILTER_ROWS_TOTAL
872 .with_label_values(&["parquet"])
873 .inc_by(self.rows_precise_filtered as u64);
874 READ_ROWS_IN_ROW_GROUP_TOTAL
875 .with_label_values(&["before_filtering"])
876 .inc_by(self.rows_total as u64);
877 READ_ROWS_IN_ROW_GROUP_TOTAL
878 .with_label_values(&["fulltext_index_filtered"])
879 .inc_by(self.rows_fulltext_filtered as u64);
880 READ_ROWS_IN_ROW_GROUP_TOTAL
881 .with_label_values(&["inverted_index_filtered"])
882 .inc_by(self.rows_inverted_filtered as u64);
883 READ_ROWS_IN_ROW_GROUP_TOTAL
884 .with_label_values(&["bloom_filter_index_filtered"])
885 .inc_by(self.rows_bloom_filtered as u64);
886 }
887}
888
889#[derive(Debug, Default, Clone)]
891pub(crate) struct ReaderMetrics {
892 pub(crate) filter_metrics: ReaderFilterMetrics,
894 pub(crate) build_cost: Duration,
896 pub(crate) scan_cost: Duration,
898 pub(crate) num_record_batches: usize,
900 pub(crate) num_batches: usize,
902 pub(crate) num_rows: usize,
904}
905
906impl ReaderMetrics {
907 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
909 self.filter_metrics.merge_from(&other.filter_metrics);
910 self.build_cost += other.build_cost;
911 self.scan_cost += other.scan_cost;
912 self.num_record_batches += other.num_record_batches;
913 self.num_batches += other.num_batches;
914 self.num_rows += other.num_rows;
915 }
916
917 pub(crate) fn observe_rows(&self, read_type: &str) {
919 READ_ROWS_TOTAL
920 .with_label_values(&[read_type])
921 .inc_by(self.num_rows as u64);
922 }
923}
924
925pub(crate) struct RowGroupReaderBuilder {
927 file_handle: FileHandle,
931 file_path: String,
933 parquet_meta: Arc<ParquetMetaData>,
935 object_store: ObjectStore,
937 projection: ProjectionMask,
939 field_levels: FieldLevels,
941 cache_strategy: CacheStrategy,
943}
944
945impl RowGroupReaderBuilder {
946 pub(crate) fn file_path(&self) -> &str {
948 &self.file_path
949 }
950
951 pub(crate) fn file_handle(&self) -> &FileHandle {
953 &self.file_handle
954 }
955
956 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
957 &self.parquet_meta
958 }
959
960 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
961 &self.cache_strategy
962 }
963
964 pub(crate) async fn build(
966 &self,
967 row_group_idx: usize,
968 row_selection: Option<RowSelection>,
969 ) -> Result<ParquetRecordBatchReader> {
970 let mut row_group = InMemoryRowGroup::create(
971 self.file_handle.region_id(),
972 self.file_handle.file_id(),
973 &self.parquet_meta,
974 row_group_idx,
975 self.cache_strategy.clone(),
976 &self.file_path,
977 self.object_store.clone(),
978 );
979 row_group
981 .fetch(&self.projection, row_selection.as_ref())
982 .await
983 .context(ReadParquetSnafu {
984 path: &self.file_path,
985 })?;
986
987 ParquetRecordBatchReader::try_new_with_row_groups(
990 &self.field_levels,
991 &row_group,
992 DEFAULT_READ_BATCH_SIZE,
993 row_selection,
994 )
995 .context(ReadParquetSnafu {
996 path: &self.file_path,
997 })
998 }
999}
1000
1001enum ReaderState {
1003 Readable(PruneReader),
1005 Exhausted(ReaderMetrics),
1007}
1008
1009impl ReaderState {
1010 fn metrics(&self) -> ReaderMetrics {
1012 match self {
1013 ReaderState::Readable(reader) => reader.metrics(),
1014 ReaderState::Exhausted(m) => m.clone(),
1015 }
1016 }
1017}
1018
1019pub(crate) enum MaybeFilter {
1021 Filter(SimpleFilterEvaluator),
1023 Matched,
1025 Pruned,
1027}
1028
1029pub(crate) struct SimpleFilterContext {
1031 filter: MaybeFilter,
1033 column_id: ColumnId,
1035 semantic_type: SemanticType,
1037 data_type: ConcreteDataType,
1039}
1040
1041impl SimpleFilterContext {
1042 pub(crate) fn new_opt(
1047 sst_meta: &RegionMetadataRef,
1048 expected_meta: Option<&RegionMetadata>,
1049 expr: &Expr,
1050 ) -> Option<Self> {
1051 let filter = SimpleFilterEvaluator::try_new(expr)?;
1052 let (column_metadata, maybe_filter) = match expected_meta {
1053 Some(meta) => {
1054 let column = meta.column_by_name(filter.column_name())?;
1056 match sst_meta.column_by_id(column.column_id) {
1059 Some(sst_column) => {
1060 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1061
1062 (column, MaybeFilter::Filter(filter))
1063 }
1064 None => {
1065 if pruned_by_default(&filter, column)? {
1069 (column, MaybeFilter::Pruned)
1070 } else {
1071 (column, MaybeFilter::Matched)
1072 }
1073 }
1074 }
1075 }
1076 None => {
1077 let column = sst_meta.column_by_name(filter.column_name())?;
1078 (column, MaybeFilter::Filter(filter))
1079 }
1080 };
1081
1082 Some(Self {
1083 filter: maybe_filter,
1084 column_id: column_metadata.column_id,
1085 semantic_type: column_metadata.semantic_type,
1086 data_type: column_metadata.column_schema.data_type.clone(),
1087 })
1088 }
1089
1090 pub(crate) fn filter(&self) -> &MaybeFilter {
1092 &self.filter
1093 }
1094
1095 pub(crate) fn column_id(&self) -> ColumnId {
1097 self.column_id
1098 }
1099
1100 pub(crate) fn semantic_type(&self) -> SemanticType {
1102 self.semantic_type
1103 }
1104
1105 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1107 &self.data_type
1108 }
1109}
1110
1111fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1114 let value = column.column_schema.create_default().ok().flatten()?;
1115 let scalar_value = value
1116 .try_to_scalar_value(&column.column_schema.data_type)
1117 .ok()?;
1118 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1119 Some(!matches)
1120}
1121
1122type RowGroupMap = BTreeMap<usize, Option<RowSelection>>;
1123
1124pub struct ParquetReader {
1126 context: FileRangeContextRef,
1128 row_groups: RowGroupMap,
1130 reader_state: ReaderState,
1132}
1133
1134#[async_trait]
1135impl BatchReader for ParquetReader {
1136 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1137 let ReaderState::Readable(reader) = &mut self.reader_state else {
1138 return Ok(None);
1139 };
1140
1141 if let Some(batch) = reader.next_batch().await? {
1143 return Ok(Some(batch));
1144 }
1145
1146 while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() {
1148 let parquet_reader = self
1149 .context
1150 .reader_builder()
1151 .build(row_group_idx, row_selection)
1152 .await?;
1153
1154 reader.reset_source(Source::RowGroup(RowGroupReader::new(
1156 self.context.clone(),
1157 parquet_reader,
1158 )));
1159 if let Some(batch) = reader.next_batch().await? {
1160 return Ok(Some(batch));
1161 }
1162 }
1163
1164 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1166 Ok(None)
1167 }
1168}
1169
1170impl Drop for ParquetReader {
1171 fn drop(&mut self) {
1172 let metrics = self.reader_state.metrics();
1173 debug!(
1174 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1175 self.context.reader_builder().file_handle.region_id(),
1176 self.context.reader_builder().file_handle.file_id(),
1177 self.context.reader_builder().file_handle.time_range(),
1178 metrics.filter_metrics.rg_total
1179 - metrics.filter_metrics.rg_inverted_filtered
1180 - metrics.filter_metrics.rg_minmax_filtered
1181 - metrics.filter_metrics.rg_fulltext_filtered
1182 - metrics.filter_metrics.rg_bloom_filtered,
1183 metrics.filter_metrics.rg_total,
1184 metrics
1185 );
1186
1187 READ_STAGE_ELAPSED
1189 .with_label_values(&["build_parquet_reader"])
1190 .observe(metrics.build_cost.as_secs_f64());
1191 READ_STAGE_ELAPSED
1192 .with_label_values(&["scan_row_groups"])
1193 .observe(metrics.scan_cost.as_secs_f64());
1194 metrics.observe_rows("parquet_reader");
1195 metrics.filter_metrics.observe();
1196 }
1197}
1198
1199impl ParquetReader {
1200 async fn new(
1202 context: FileRangeContextRef,
1203 mut row_groups: BTreeMap<usize, Option<RowSelection>>,
1204 ) -> Result<Self> {
1205 let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() {
1207 let parquet_reader = context
1208 .reader_builder()
1209 .build(row_group_idx, row_selection)
1210 .await?;
1211 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1212 context.clone(),
1213 RowGroupReader::new(context.clone(), parquet_reader),
1214 ))
1215 } else {
1216 ReaderState::Exhausted(ReaderMetrics::default())
1217 };
1218
1219 Ok(ParquetReader {
1220 context,
1221 row_groups,
1222 reader_state,
1223 })
1224 }
1225
1226 pub fn metadata(&self) -> &RegionMetadataRef {
1228 self.context.read_format().metadata()
1229 }
1230
1231 #[cfg(test)]
1232 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1233 self.context.reader_builder().parquet_meta.clone()
1234 }
1235}
1236
1237pub(crate) trait RowGroupReaderContext: Send {
1240 fn map_result(
1241 &self,
1242 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1243 ) -> Result<Option<RecordBatch>>;
1244
1245 fn read_format(&self) -> &ReadFormat;
1246}
1247
1248impl RowGroupReaderContext for FileRangeContextRef {
1249 fn map_result(
1250 &self,
1251 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1252 ) -> Result<Option<RecordBatch>> {
1253 result.context(ArrowReaderSnafu {
1254 path: self.file_path(),
1255 })
1256 }
1257
1258 fn read_format(&self) -> &ReadFormat {
1259 self.as_ref().read_format()
1260 }
1261}
1262
1263pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1265
1266impl RowGroupReader {
1267 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1269 Self {
1270 context,
1271 reader,
1272 batches: VecDeque::new(),
1273 metrics: ReaderMetrics::default(),
1274 }
1275 }
1276}
1277
1278pub(crate) struct RowGroupReaderBase<T> {
1280 context: T,
1282 reader: ParquetRecordBatchReader,
1284 batches: VecDeque<Batch>,
1286 metrics: ReaderMetrics,
1288}
1289
1290impl<T> RowGroupReaderBase<T>
1291where
1292 T: RowGroupReaderContext,
1293{
1294 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1296 Self {
1297 context,
1298 reader,
1299 batches: VecDeque::new(),
1300 metrics: ReaderMetrics::default(),
1301 }
1302 }
1303
1304 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1306 &self.metrics
1307 }
1308
1309 pub(crate) fn read_format(&self) -> &ReadFormat {
1311 self.context.read_format()
1312 }
1313
1314 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1316 self.context.map_result(self.reader.next().transpose())
1317 }
1318
1319 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1321 let scan_start = Instant::now();
1322 if let Some(batch) = self.batches.pop_front() {
1323 self.metrics.num_rows += batch.num_rows();
1324 self.metrics.scan_cost += scan_start.elapsed();
1325 return Ok(Some(batch));
1326 }
1327
1328 while self.batches.is_empty() {
1330 let Some(record_batch) = self.fetch_next_record_batch()? else {
1331 self.metrics.scan_cost += scan_start.elapsed();
1332 return Ok(None);
1333 };
1334 self.metrics.num_record_batches += 1;
1335
1336 self.context
1337 .read_format()
1338 .convert_record_batch(&record_batch, &mut self.batches)?;
1339 self.metrics.num_batches += self.batches.len();
1340 }
1341 let batch = self.batches.pop_front();
1342 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1343 self.metrics.scan_cost += scan_start.elapsed();
1344 Ok(batch)
1345 }
1346}
1347
1348#[async_trait::async_trait]
1349impl<T> BatchReader for RowGroupReaderBase<T>
1350where
1351 T: RowGroupReaderContext,
1352{
1353 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1354 self.next_inner()
1355 }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use parquet::arrow::arrow_reader::RowSelector;
1361 use parquet::file::metadata::{FileMetaData, RowGroupMetaData};
1362 use parquet::schema::types::{SchemaDescriptor, Type};
1363
1364 use super::*;
1365
1366 fn mock_parquet_metadata_from_row_groups(num_rows_in_row_groups: Vec<i64>) -> ParquetMetaData {
1367 let tp = Arc::new(Type::group_type_builder("test").build().unwrap());
1368 let schema_descr = Arc::new(SchemaDescriptor::new(tp));
1369
1370 let mut row_groups = Vec::new();
1371 for num_rows in &num_rows_in_row_groups {
1372 let row_group = RowGroupMetaData::builder(schema_descr.clone())
1373 .set_num_rows(*num_rows)
1374 .build()
1375 .unwrap();
1376 row_groups.push(row_group);
1377 }
1378
1379 let file_meta = FileMetaData::new(
1380 0,
1381 num_rows_in_row_groups.iter().sum(),
1382 None,
1383 None,
1384 schema_descr,
1385 None,
1386 );
1387 ParquetMetaData::new(file_meta, row_groups)
1388 }
1389
1390 #[test]
1391 fn test_group_row_ids() {
1392 let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
1393 let row_group_size = 5;
1394 let num_row_groups = 3;
1395
1396 let row_group_to_row_ids =
1397 ParquetReaderBuilder::group_row_ids(row_ids, row_group_size, num_row_groups);
1398
1399 assert_eq!(
1400 row_group_to_row_ids,
1401 vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
1402 );
1403 }
1404
1405 #[test]
1406 fn prune_row_groups_by_rows_from_empty() {
1407 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1408
1409 let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
1410
1411 let mut output = BTreeMap::new();
1413 let mut filtered_row_groups = 0;
1414 let mut filtered_rows = 0;
1415
1416 ParquetReaderBuilder::prune_row_groups_by_rows(
1417 &parquet_meta,
1418 rows_in_row_groups,
1419 &mut output,
1420 &mut filtered_row_groups,
1421 &mut filtered_rows,
1422 );
1423
1424 assert!(output.is_empty());
1425 assert_eq!(filtered_row_groups, 0);
1426 assert_eq!(filtered_rows, 0);
1427 }
1428
1429 #[test]
1430 fn prune_row_groups_by_rows_from_full() {
1431 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1432
1433 let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
1434
1435 let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
1437 let mut filtered_row_groups = 0;
1438 let mut filtered_rows = 0;
1439
1440 ParquetReaderBuilder::prune_row_groups_by_rows(
1441 &parquet_meta,
1442 rows_in_row_groups,
1443 &mut output,
1444 &mut filtered_row_groups,
1445 &mut filtered_rows,
1446 );
1447
1448 assert_eq!(
1449 output,
1450 BTreeMap::from([
1451 (
1452 0,
1453 Some(RowSelection::from(vec![
1454 RowSelector::skip(5),
1455 RowSelector::select(5),
1456 ]))
1457 ),
1458 (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1459 ])
1460 );
1461 assert_eq!(filtered_row_groups, 1);
1462 assert_eq!(filtered_rows, 15);
1463 }
1464
1465 #[test]
1466 fn prune_row_groups_by_rows_from_not_full() {
1467 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1468
1469 let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
1470
1471 let mut output = BTreeMap::from([
1473 (
1474 0,
1475 Some(RowSelection::from(vec![
1476 RowSelector::select(5),
1477 RowSelector::skip(5),
1478 ])),
1479 ),
1480 (
1481 1,
1482 Some(RowSelection::from(vec![
1483 RowSelector::select(5),
1484 RowSelector::skip(5),
1485 ])),
1486 ),
1487 (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1488 ]);
1489 let mut filtered_row_groups = 0;
1490 let mut filtered_rows = 0;
1491
1492 ParquetReaderBuilder::prune_row_groups_by_rows(
1493 &parquet_meta,
1494 rows_in_row_groups,
1495 &mut output,
1496 &mut filtered_row_groups,
1497 &mut filtered_rows,
1498 );
1499
1500 assert_eq!(
1501 output,
1502 BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
1503 );
1504 assert_eq!(filtered_row_groups, 2);
1505 assert_eq!(filtered_rows, 10);
1506 }
1507
1508 #[test]
1509 fn prune_row_groups_by_ranges_from_empty() {
1510 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1511
1512 let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
1513
1514 let mut output = BTreeMap::new();
1516 let mut filtered_row_groups = 0;
1517 let mut filtered_rows = 0;
1518
1519 ParquetReaderBuilder::prune_row_groups_by_ranges(
1520 &parquet_meta,
1521 ranges_in_row_groups.into_iter(),
1522 &mut output,
1523 &mut filtered_row_groups,
1524 &mut filtered_rows,
1525 );
1526
1527 assert!(output.is_empty());
1528 assert_eq!(filtered_row_groups, 0);
1529 assert_eq!(filtered_rows, 0);
1530 }
1531
1532 #[test]
1533 fn prune_row_groups_by_ranges_from_full() {
1534 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1535
1536 let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
1537
1538 let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
1540 let mut filtered_row_groups = 0;
1541 let mut filtered_rows = 0;
1542
1543 ParquetReaderBuilder::prune_row_groups_by_ranges(
1544 &parquet_meta,
1545 ranges_in_row_groups.into_iter(),
1546 &mut output,
1547 &mut filtered_row_groups,
1548 &mut filtered_rows,
1549 );
1550
1551 assert_eq!(
1552 output,
1553 BTreeMap::from([
1554 (
1555 0,
1556 Some(RowSelection::from(vec![
1557 RowSelector::skip(5),
1558 RowSelector::select(5),
1559 ]))
1560 ),
1561 (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1562 ])
1563 );
1564 assert_eq!(filtered_row_groups, 1);
1565 assert_eq!(filtered_rows, 15);
1566 }
1567
1568 #[test]
1569 fn prune_row_groups_by_ranges_from_not_full() {
1570 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1571
1572 let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
1573
1574 let mut output = BTreeMap::from([
1576 (
1577 0,
1578 Some(RowSelection::from(vec![
1579 RowSelector::select(5),
1580 RowSelector::skip(5),
1581 ])),
1582 ),
1583 (
1584 1,
1585 Some(RowSelection::from(vec![
1586 RowSelector::select(5),
1587 RowSelector::skip(5),
1588 ])),
1589 ),
1590 (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1591 ]);
1592 let mut filtered_row_groups = 0;
1593 let mut filtered_rows = 0;
1594
1595 ParquetReaderBuilder::prune_row_groups_by_ranges(
1596 &parquet_meta,
1597 ranges_in_row_groups.into_iter(),
1598 &mut output,
1599 &mut filtered_row_groups,
1600 &mut filtered_rows,
1601 );
1602
1603 assert_eq!(
1604 output,
1605 BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
1606 );
1607 assert_eq!(filtered_row_groups, 2);
1608 assert_eq!(filtered_rows, 10);
1609 }
1610
1611 #[test]
1612 fn prune_row_groups_by_ranges_exceed_end() {
1613 let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
1614
1615 let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..10).into_iter())];
1617
1618 let mut output = BTreeMap::from([
1619 (
1620 0,
1621 Some(RowSelection::from(vec![
1622 RowSelector::select(5),
1623 RowSelector::skip(5),
1624 ])),
1625 ),
1626 (
1627 1,
1628 Some(RowSelection::from(vec![
1629 RowSelector::select(5),
1630 RowSelector::skip(5),
1631 ])),
1632 ),
1633 (2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
1634 ]);
1635 let mut filtered_row_groups = 0;
1636 let mut filtered_rows = 0;
1637
1638 ParquetReaderBuilder::prune_row_groups_by_ranges(
1639 &parquet_meta,
1640 ranges_in_row_groups.into_iter(),
1641 &mut output,
1642 &mut filtered_row_groups,
1643 &mut filtered_rows,
1644 );
1645
1646 assert_eq!(
1647 output,
1648 BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
1649 );
1650 assert_eq!(filtered_row_groups, 2);
1651 assert_eq!(filtered_rows, 10);
1652 }
1653}