1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, tracing, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46 ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::{
56 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
57};
58use crate::sst::index::fulltext_index::applier::{
59 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
60};
61use crate::sst::index::inverted_index::applier::{
62 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
63};
64use crate::sst::parquet::file_range::{
65 FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete,
66};
67use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
68use crate::sst::parquet::metadata::MetadataLoader;
69use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
70use crate::sst::parquet::row_selection::RowGroupSelection;
71use crate::sst::parquet::stats::RowGroupPruningStats;
72use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
73
74const INDEX_TYPE_FULLTEXT: &str = "fulltext";
75const INDEX_TYPE_INVERTED: &str = "inverted";
76const INDEX_TYPE_BLOOM: &str = "bloom filter";
77
78macro_rules! handle_index_error {
79 ($err:expr, $file_handle:expr, $index_type:expr) => {
80 if cfg!(any(test, feature = "test")) {
81 panic!(
82 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
83 $index_type,
84 $file_handle.region_id(),
85 $file_handle.file_id(),
86 $err
87 );
88 } else {
89 warn!(
90 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
91 $index_type,
92 $file_handle.region_id(),
93 $file_handle.file_id()
94 );
95 }
96 };
97}
98
99pub struct ParquetReaderBuilder {
101 table_dir: String,
103 path_type: PathType,
105 file_handle: FileHandle,
106 object_store: ObjectStore,
107 predicate: Option<Predicate>,
109 projection: Option<Vec<ColumnId>>,
114 cache_strategy: CacheStrategy,
116 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
118 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
119 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
120 expected_metadata: Option<RegionMetadataRef>,
124 flat_format: bool,
126 compaction: bool,
128 pre_filter_mode: PreFilterMode,
130 decode_primary_key_values: bool,
132}
133
134impl ParquetReaderBuilder {
135 pub fn new(
137 table_dir: String,
138 path_type: PathType,
139 file_handle: FileHandle,
140 object_store: ObjectStore,
141 ) -> ParquetReaderBuilder {
142 ParquetReaderBuilder {
143 table_dir,
144 path_type,
145 file_handle,
146 object_store,
147 predicate: None,
148 projection: None,
149 cache_strategy: CacheStrategy::Disabled,
150 inverted_index_appliers: [None, None],
151 bloom_filter_index_appliers: [None, None],
152 fulltext_index_appliers: [None, None],
153 expected_metadata: None,
154 flat_format: false,
155 compaction: false,
156 pre_filter_mode: PreFilterMode::All,
157 decode_primary_key_values: false,
158 }
159 }
160
161 #[must_use]
163 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
164 self.predicate = predicate;
165 self
166 }
167
168 #[must_use]
172 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
173 self.projection = projection;
174 self
175 }
176
177 #[must_use]
179 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
180 self.cache_strategy = cache;
181 self
182 }
183
184 #[must_use]
186 pub(crate) fn inverted_index_appliers(
187 mut self,
188 index_appliers: [Option<InvertedIndexApplierRef>; 2],
189 ) -> Self {
190 self.inverted_index_appliers = index_appliers;
191 self
192 }
193
194 #[must_use]
196 pub(crate) fn bloom_filter_index_appliers(
197 mut self,
198 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
199 ) -> Self {
200 self.bloom_filter_index_appliers = index_appliers;
201 self
202 }
203
204 #[must_use]
206 pub(crate) fn fulltext_index_appliers(
207 mut self,
208 index_appliers: [Option<FulltextIndexApplierRef>; 2],
209 ) -> Self {
210 self.fulltext_index_appliers = index_appliers;
211 self
212 }
213
214 #[must_use]
216 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
217 self.expected_metadata = expected_metadata;
218 self
219 }
220
221 #[must_use]
223 pub fn flat_format(mut self, flat_format: bool) -> Self {
224 self.flat_format = flat_format;
225 self
226 }
227
228 #[must_use]
230 pub fn compaction(mut self, compaction: bool) -> Self {
231 self.compaction = compaction;
232 self
233 }
234
235 #[must_use]
237 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
238 self.pre_filter_mode = pre_filter_mode;
239 self
240 }
241
242 #[must_use]
244 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
245 self.decode_primary_key_values = decode;
246 self
247 }
248
249 #[tracing::instrument(
253 skip_all,
254 fields(
255 region_id = %self.file_handle.region_id(),
256 file_id = %self.file_handle.file_id()
257 )
258 )]
259 pub async fn build(&self) -> Result<ParquetReader> {
260 let mut metrics = ReaderMetrics::default();
261
262 let (context, selection) = self.build_reader_input(&mut metrics).await?;
263 ParquetReader::new(Arc::new(context), selection).await
264 }
265
266 #[tracing::instrument(
270 skip_all,
271 fields(
272 region_id = %self.file_handle.region_id(),
273 file_id = %self.file_handle.file_id()
274 )
275 )]
276 pub(crate) async fn build_reader_input(
277 &self,
278 metrics: &mut ReaderMetrics,
279 ) -> Result<(FileRangeContext, RowGroupSelection)> {
280 let start = Instant::now();
281
282 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
283 let file_size = self.file_handle.meta_ref().file_size;
284
285 let (parquet_meta, cache_miss) = self
287 .read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
288 .await?;
289 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
291 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
293 let mut read_format = if let Some(column_ids) = &self.projection {
294 ReadFormat::new(
295 region_meta.clone(),
296 Some(column_ids),
297 self.flat_format,
298 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
299 &file_path,
300 self.compaction,
301 )?
302 } else {
303 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
305 let column_ids: Vec<_> = expected_meta
306 .column_metadatas
307 .iter()
308 .map(|col| col.column_id)
309 .collect();
310 ReadFormat::new(
311 region_meta.clone(),
312 Some(&column_ids),
313 self.flat_format,
314 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
315 &file_path,
316 self.compaction,
317 )?
318 };
319 if self.decode_primary_key_values {
320 read_format.set_decode_primary_key_values(true);
321 }
322 if need_override_sequence(&parquet_meta) {
323 read_format
324 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
325 }
326
327 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
329 let indices = read_format.projection_indices();
330 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
333
334 let hint = Some(read_format.arrow_schema().fields());
336 let field_levels =
337 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
338 .context(ReadDataPartSnafu)?;
339 let selection = self
340 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
341 .await;
342
343 if cache_miss && !selection.is_empty() {
345 use crate::cache::file_cache::{FileType, IndexKey};
346 let index_key = IndexKey::new(
347 self.file_handle.region_id(),
348 self.file_handle.file_id().file_id(),
349 FileType::Parquet,
350 );
351 self.cache_strategy.maybe_download_background(
352 index_key,
353 file_path.clone(),
354 self.object_store.clone(),
355 file_size,
356 );
357 }
358
359 let prune_schema = self
360 .expected_metadata
361 .as_ref()
362 .map(|meta| meta.schema.clone())
363 .unwrap_or_else(|| region_meta.schema.clone());
364
365 let reader_builder = RowGroupReaderBuilder {
366 file_handle: self.file_handle.clone(),
367 file_path,
368 parquet_meta,
369 object_store: self.object_store.clone(),
370 projection: projection_mask,
371 field_levels,
372 cache_strategy: self.cache_strategy.clone(),
373 };
374
375 let filters = if let Some(predicate) = &self.predicate {
376 predicate
377 .exprs()
378 .iter()
379 .filter_map(|expr| {
380 SimpleFilterContext::new_opt(
381 ®ion_meta,
382 self.expected_metadata.as_deref(),
383 expr,
384 )
385 })
386 .collect::<Vec<_>>()
387 } else {
388 vec![]
389 };
390
391 let dyn_filters = if let Some(predicate) = &self.predicate {
392 predicate.dyn_filters().clone()
393 } else {
394 Arc::new(vec![])
395 };
396
397 let codec = build_primary_key_codec(read_format.metadata());
398
399 let context = FileRangeContext::new(
400 reader_builder,
401 RangeBase {
402 filters,
403 dyn_filters,
404 read_format,
405 expected_metadata: self.expected_metadata.clone(),
406 prune_schema,
407 codec,
408 compat_batch: None,
409 pre_filter_mode: self.pre_filter_mode,
410 },
411 );
412
413 metrics.build_cost += start.elapsed();
414
415 Ok((context, selection))
416 }
417
418 fn get_region_metadata(
420 file_path: &str,
421 key_value_meta: Option<&Vec<KeyValue>>,
422 ) -> Result<RegionMetadata> {
423 let key_values = key_value_meta.context(InvalidParquetSnafu {
424 file: file_path,
425 reason: "missing key value meta",
426 })?;
427 let meta_value = key_values
428 .iter()
429 .find(|kv| kv.key == PARQUET_METADATA_KEY)
430 .with_context(|| InvalidParquetSnafu {
431 file: file_path,
432 reason: format!("key {} not found", PARQUET_METADATA_KEY),
433 })?;
434 let json = meta_value
435 .value
436 .as_ref()
437 .with_context(|| InvalidParquetSnafu {
438 file: file_path,
439 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
440 })?;
441
442 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
443 }
444
445 async fn read_parquet_metadata(
448 &self,
449 file_path: &str,
450 file_size: u64,
451 cache_metrics: &mut MetadataCacheMetrics,
452 ) -> Result<(Arc<ParquetMetaData>, bool)> {
453 let start = Instant::now();
454 let _t = READ_STAGE_ELAPSED
455 .with_label_values(&["read_parquet_metadata"])
456 .start_timer();
457
458 let file_id = self.file_handle.file_id();
459 if let Some(metadata) = self
461 .cache_strategy
462 .get_parquet_meta_data(file_id, cache_metrics)
463 .await
464 {
465 cache_metrics.metadata_load_cost += start.elapsed();
466 return Ok((metadata, false));
467 }
468
469 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
471 let metadata = metadata_loader.load().await?;
472
473 let metadata = Arc::new(metadata);
474 self.cache_strategy
476 .put_parquet_meta_data(file_id, metadata.clone());
477
478 cache_metrics.metadata_load_cost += start.elapsed();
479 Ok((metadata, true))
480 }
481
482 #[tracing::instrument(
484 skip_all,
485 fields(
486 region_id = %self.file_handle.region_id(),
487 file_id = %self.file_handle.file_id()
488 )
489 )]
490 async fn row_groups_to_read(
491 &self,
492 read_format: &ReadFormat,
493 parquet_meta: &ParquetMetaData,
494 metrics: &mut ReaderFilterMetrics,
495 ) -> RowGroupSelection {
496 let num_row_groups = parquet_meta.num_row_groups();
497 let num_rows = parquet_meta.file_metadata().num_rows();
498 if num_row_groups == 0 || num_rows == 0 {
499 return RowGroupSelection::default();
500 }
501
502 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
505 if row_group_size == 0 {
506 return RowGroupSelection::default();
507 }
508
509 metrics.rg_total += num_row_groups;
510 metrics.rows_total += num_rows as usize;
511
512 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
513
514 let skip_fields = self.compute_skip_fields(parquet_meta);
516
517 self.prune_row_groups_by_minmax(
518 read_format,
519 parquet_meta,
520 &mut output,
521 metrics,
522 skip_fields,
523 );
524 if output.is_empty() {
525 return output;
526 }
527
528 let fulltext_filtered = self
529 .prune_row_groups_by_fulltext_index(
530 row_group_size,
531 num_row_groups,
532 &mut output,
533 metrics,
534 skip_fields,
535 )
536 .await;
537 if output.is_empty() {
538 return output;
539 }
540
541 self.prune_row_groups_by_inverted_index(
542 row_group_size,
543 num_row_groups,
544 &mut output,
545 metrics,
546 skip_fields,
547 )
548 .await;
549 if output.is_empty() {
550 return output;
551 }
552
553 self.prune_row_groups_by_bloom_filter(
554 row_group_size,
555 parquet_meta,
556 &mut output,
557 metrics,
558 skip_fields,
559 )
560 .await;
561 if output.is_empty() {
562 return output;
563 }
564
565 if !fulltext_filtered {
566 self.prune_row_groups_by_fulltext_bloom(
567 row_group_size,
568 parquet_meta,
569 &mut output,
570 metrics,
571 skip_fields,
572 )
573 .await;
574 }
575 output
576 }
577
578 async fn prune_row_groups_by_fulltext_index(
580 &self,
581 row_group_size: usize,
582 num_row_groups: usize,
583 output: &mut RowGroupSelection,
584 metrics: &mut ReaderFilterMetrics,
585 skip_fields: bool,
586 ) -> bool {
587 if !self.file_handle.meta_ref().fulltext_index_available() {
588 return false;
589 }
590
591 let mut pruned = false;
592 let appliers = if skip_fields {
594 &self.fulltext_index_appliers[..1]
595 } else {
596 &self.fulltext_index_appliers[..]
597 };
598 for index_applier in appliers.iter().flatten() {
599 let predicate_key = index_applier.predicate_key();
600 let cached = self
602 .cache_strategy
603 .index_result_cache()
604 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
605 if let Some(result) = cached.as_ref()
606 && all_required_row_groups_searched(output, result)
607 {
608 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
609 pruned = true;
610 continue;
611 }
612
613 let file_size_hint = self.file_handle.meta_ref().index_file_size();
615 let apply_res = index_applier
616 .apply_fine(
617 self.file_handle.index_id(),
618 Some(file_size_hint),
619 metrics.fulltext_index_apply_metrics.as_mut(),
620 )
621 .await;
622 let selection = match apply_res {
623 Ok(Some(res)) => {
624 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
625 }
626 Ok(None) => continue,
627 Err(err) => {
628 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
629 continue;
630 }
631 };
632
633 self.apply_index_result_and_update_cache(
634 predicate_key,
635 self.file_handle.file_id().file_id(),
636 selection,
637 output,
638 metrics,
639 INDEX_TYPE_FULLTEXT,
640 );
641 pruned = true;
642 }
643 pruned
644 }
645
646 async fn prune_row_groups_by_inverted_index(
652 &self,
653 row_group_size: usize,
654 num_row_groups: usize,
655 output: &mut RowGroupSelection,
656 metrics: &mut ReaderFilterMetrics,
657 skip_fields: bool,
658 ) -> bool {
659 if !self.file_handle.meta_ref().inverted_index_available() {
660 return false;
661 }
662
663 let mut pruned = false;
664 let appliers = if skip_fields {
666 &self.inverted_index_appliers[..1]
667 } else {
668 &self.inverted_index_appliers[..]
669 };
670 for index_applier in appliers.iter().flatten() {
671 let predicate_key = index_applier.predicate_key();
672 let cached = self
674 .cache_strategy
675 .index_result_cache()
676 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
677 if let Some(result) = cached.as_ref()
678 && all_required_row_groups_searched(output, result)
679 {
680 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
681 pruned = true;
682 continue;
683 }
684
685 let file_size_hint = self.file_handle.meta_ref().index_file_size();
687 let apply_res = index_applier
688 .apply(
689 self.file_handle.index_id(),
690 Some(file_size_hint),
691 metrics.inverted_index_apply_metrics.as_mut(),
692 )
693 .await;
694 let selection = match apply_res {
695 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
696 row_group_size,
697 num_row_groups,
698 apply_output,
699 ),
700 Err(err) => {
701 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
702 continue;
703 }
704 };
705
706 self.apply_index_result_and_update_cache(
707 predicate_key,
708 self.file_handle.file_id().file_id(),
709 selection,
710 output,
711 metrics,
712 INDEX_TYPE_INVERTED,
713 );
714 pruned = true;
715 }
716 pruned
717 }
718
719 async fn prune_row_groups_by_bloom_filter(
720 &self,
721 row_group_size: usize,
722 parquet_meta: &ParquetMetaData,
723 output: &mut RowGroupSelection,
724 metrics: &mut ReaderFilterMetrics,
725 skip_fields: bool,
726 ) -> bool {
727 if !self.file_handle.meta_ref().bloom_filter_index_available() {
728 return false;
729 }
730
731 let mut pruned = false;
732 let appliers = if skip_fields {
734 &self.bloom_filter_index_appliers[..1]
735 } else {
736 &self.bloom_filter_index_appliers[..]
737 };
738 for index_applier in appliers.iter().flatten() {
739 let predicate_key = index_applier.predicate_key();
740 let cached = self
742 .cache_strategy
743 .index_result_cache()
744 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
745 if let Some(result) = cached.as_ref()
746 && all_required_row_groups_searched(output, result)
747 {
748 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
749 pruned = true;
750 continue;
751 }
752
753 let file_size_hint = self.file_handle.meta_ref().index_file_size();
755 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
756 (
757 rg.num_rows() as usize,
758 output.contains_non_empty_row_group(i)
760 && cached
761 .as_ref()
762 .map(|c| !c.contains_row_group(i))
763 .unwrap_or(true),
764 )
765 });
766 let apply_res = index_applier
767 .apply(
768 self.file_handle.index_id(),
769 Some(file_size_hint),
770 rgs,
771 metrics.bloom_filter_apply_metrics.as_mut(),
772 )
773 .await;
774 let mut selection = match apply_res {
775 Ok(apply_output) => {
776 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
777 }
778 Err(err) => {
779 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
780 continue;
781 }
782 };
783
784 if let Some(cached) = cached.as_ref() {
786 selection.concat(cached);
787 }
788
789 self.apply_index_result_and_update_cache(
790 predicate_key,
791 self.file_handle.file_id().file_id(),
792 selection,
793 output,
794 metrics,
795 INDEX_TYPE_BLOOM,
796 );
797 pruned = true;
798 }
799 pruned
800 }
801
802 async fn prune_row_groups_by_fulltext_bloom(
803 &self,
804 row_group_size: usize,
805 parquet_meta: &ParquetMetaData,
806 output: &mut RowGroupSelection,
807 metrics: &mut ReaderFilterMetrics,
808 skip_fields: bool,
809 ) -> bool {
810 if !self.file_handle.meta_ref().fulltext_index_available() {
811 return false;
812 }
813
814 let mut pruned = false;
815 let appliers = if skip_fields {
817 &self.fulltext_index_appliers[..1]
818 } else {
819 &self.fulltext_index_appliers[..]
820 };
821 for index_applier in appliers.iter().flatten() {
822 let predicate_key = index_applier.predicate_key();
823 let cached = self
825 .cache_strategy
826 .index_result_cache()
827 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
828 if let Some(result) = cached.as_ref()
829 && all_required_row_groups_searched(output, result)
830 {
831 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
832 pruned = true;
833 continue;
834 }
835
836 let file_size_hint = self.file_handle.meta_ref().index_file_size();
838 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
839 (
840 rg.num_rows() as usize,
841 output.contains_non_empty_row_group(i)
843 && cached
844 .as_ref()
845 .map(|c| !c.contains_row_group(i))
846 .unwrap_or(true),
847 )
848 });
849 let apply_res = index_applier
850 .apply_coarse(
851 self.file_handle.index_id(),
852 Some(file_size_hint),
853 rgs,
854 metrics.fulltext_index_apply_metrics.as_mut(),
855 )
856 .await;
857 let mut selection = match apply_res {
858 Ok(Some(apply_output)) => {
859 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
860 }
861 Ok(None) => continue,
862 Err(err) => {
863 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
864 continue;
865 }
866 };
867
868 if let Some(cached) = cached.as_ref() {
870 selection.concat(cached);
871 }
872
873 self.apply_index_result_and_update_cache(
874 predicate_key,
875 self.file_handle.file_id().file_id(),
876 selection,
877 output,
878 metrics,
879 INDEX_TYPE_FULLTEXT,
880 );
881 pruned = true;
882 }
883 pruned
884 }
885
886 fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
888 match self.pre_filter_mode {
889 PreFilterMode::All => false,
890 PreFilterMode::SkipFields => true,
891 PreFilterMode::SkipFieldsOnDelete => {
892 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
894 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
895 row_group_contains_delete(parquet_meta, rg_idx, &file_path)
896 .inspect_err(|e| {
897 warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
898 })
899 .unwrap_or(false)
900 })
901 }
902 }
903 }
904
905 fn prune_row_groups_by_minmax(
907 &self,
908 read_format: &ReadFormat,
909 parquet_meta: &ParquetMetaData,
910 output: &mut RowGroupSelection,
911 metrics: &mut ReaderFilterMetrics,
912 skip_fields: bool,
913 ) -> bool {
914 let Some(predicate) = &self.predicate else {
915 return false;
916 };
917
918 let row_groups_before = output.row_group_count();
919
920 let region_meta = read_format.metadata();
921 let row_groups = parquet_meta.row_groups();
922 let stats = RowGroupPruningStats::new(
923 row_groups,
924 read_format,
925 self.expected_metadata.clone(),
926 skip_fields,
927 );
928 let prune_schema = self
929 .expected_metadata
930 .as_ref()
931 .map(|meta| meta.schema.arrow_schema())
932 .unwrap_or_else(|| region_meta.schema.arrow_schema());
933
934 predicate
938 .prune_with_stats(&stats, prune_schema)
939 .iter()
940 .zip(0..parquet_meta.num_row_groups())
941 .for_each(|(mask, row_group)| {
942 if !*mask {
943 output.remove_row_group(row_group);
944 }
945 });
946
947 let row_groups_after = output.row_group_count();
948 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
949
950 true
951 }
952
953 fn apply_index_result_and_update_cache(
954 &self,
955 predicate_key: &PredicateKey,
956 file_id: FileId,
957 result: RowGroupSelection,
958 output: &mut RowGroupSelection,
959 metrics: &mut ReaderFilterMetrics,
960 index_type: &str,
961 ) {
962 apply_selection_and_update_metrics(output, &result, metrics, index_type);
963
964 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
965 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
966 }
967 }
968}
969
970fn apply_selection_and_update_metrics(
971 output: &mut RowGroupSelection,
972 result: &RowGroupSelection,
973 metrics: &mut ReaderFilterMetrics,
974 index_type: &str,
975) {
976 let intersection = output.intersect(result);
977
978 let row_group_count = output.row_group_count() - intersection.row_group_count();
979 let row_count = output.row_count() - intersection.row_count();
980
981 metrics.update_index_metrics(index_type, row_group_count, row_count);
982
983 *output = intersection;
984}
985
986fn all_required_row_groups_searched(
987 required_row_groups: &RowGroupSelection,
988 cached_row_groups: &RowGroupSelection,
989) -> bool {
990 required_row_groups.iter().all(|(rg_id, _)| {
991 !required_row_groups.contains_non_empty_row_group(*rg_id)
993 || cached_row_groups.contains_row_group(*rg_id)
995 })
996}
997
998#[derive(Debug, Default, Clone)]
1000pub(crate) struct ReaderFilterMetrics {
1001 pub(crate) rg_total: usize,
1003 pub(crate) rg_fulltext_filtered: usize,
1005 pub(crate) rg_inverted_filtered: usize,
1007 pub(crate) rg_minmax_filtered: usize,
1009 pub(crate) rg_bloom_filtered: usize,
1011
1012 pub(crate) rows_total: usize,
1014 pub(crate) rows_fulltext_filtered: usize,
1016 pub(crate) rows_inverted_filtered: usize,
1018 pub(crate) rows_bloom_filtered: usize,
1020 pub(crate) rows_precise_filtered: usize,
1022
1023 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1025 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1027 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1029}
1030
1031impl ReaderFilterMetrics {
1032 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1034 self.rg_total += other.rg_total;
1035 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1036 self.rg_inverted_filtered += other.rg_inverted_filtered;
1037 self.rg_minmax_filtered += other.rg_minmax_filtered;
1038 self.rg_bloom_filtered += other.rg_bloom_filtered;
1039
1040 self.rows_total += other.rows_total;
1041 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1042 self.rows_inverted_filtered += other.rows_inverted_filtered;
1043 self.rows_bloom_filtered += other.rows_bloom_filtered;
1044 self.rows_precise_filtered += other.rows_precise_filtered;
1045
1046 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1048 self.inverted_index_apply_metrics
1049 .get_or_insert_with(Default::default)
1050 .merge_from(other_metrics);
1051 }
1052 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1053 self.bloom_filter_apply_metrics
1054 .get_or_insert_with(Default::default)
1055 .merge_from(other_metrics);
1056 }
1057 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1058 self.fulltext_index_apply_metrics
1059 .get_or_insert_with(Default::default)
1060 .merge_from(other_metrics);
1061 }
1062 }
1063
1064 pub(crate) fn observe(&self) {
1066 READ_ROW_GROUPS_TOTAL
1067 .with_label_values(&["before_filtering"])
1068 .inc_by(self.rg_total as u64);
1069 READ_ROW_GROUPS_TOTAL
1070 .with_label_values(&["fulltext_index_filtered"])
1071 .inc_by(self.rg_fulltext_filtered as u64);
1072 READ_ROW_GROUPS_TOTAL
1073 .with_label_values(&["inverted_index_filtered"])
1074 .inc_by(self.rg_inverted_filtered as u64);
1075 READ_ROW_GROUPS_TOTAL
1076 .with_label_values(&["minmax_index_filtered"])
1077 .inc_by(self.rg_minmax_filtered as u64);
1078 READ_ROW_GROUPS_TOTAL
1079 .with_label_values(&["bloom_filter_index_filtered"])
1080 .inc_by(self.rg_bloom_filtered as u64);
1081
1082 PRECISE_FILTER_ROWS_TOTAL
1083 .with_label_values(&["parquet"])
1084 .inc_by(self.rows_precise_filtered as u64);
1085 READ_ROWS_IN_ROW_GROUP_TOTAL
1086 .with_label_values(&["before_filtering"])
1087 .inc_by(self.rows_total as u64);
1088 READ_ROWS_IN_ROW_GROUP_TOTAL
1089 .with_label_values(&["fulltext_index_filtered"])
1090 .inc_by(self.rows_fulltext_filtered as u64);
1091 READ_ROWS_IN_ROW_GROUP_TOTAL
1092 .with_label_values(&["inverted_index_filtered"])
1093 .inc_by(self.rows_inverted_filtered as u64);
1094 READ_ROWS_IN_ROW_GROUP_TOTAL
1095 .with_label_values(&["bloom_filter_index_filtered"])
1096 .inc_by(self.rows_bloom_filtered as u64);
1097 }
1098
1099 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1100 match index_type {
1101 INDEX_TYPE_FULLTEXT => {
1102 self.rg_fulltext_filtered += row_group_count;
1103 self.rows_fulltext_filtered += row_count;
1104 }
1105 INDEX_TYPE_INVERTED => {
1106 self.rg_inverted_filtered += row_group_count;
1107 self.rows_inverted_filtered += row_count;
1108 }
1109 INDEX_TYPE_BLOOM => {
1110 self.rg_bloom_filtered += row_group_count;
1111 self.rows_bloom_filtered += row_count;
1112 }
1113 _ => {}
1114 }
1115 }
1116}
1117
1118#[derive(Default, Clone, Copy)]
1120pub(crate) struct MetadataCacheMetrics {
1121 pub(crate) mem_cache_hit: usize,
1123 pub(crate) file_cache_hit: usize,
1125 pub(crate) cache_miss: usize,
1127 pub(crate) metadata_load_cost: Duration,
1129}
1130
1131impl std::fmt::Debug for MetadataCacheMetrics {
1132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1133 let Self {
1134 mem_cache_hit,
1135 file_cache_hit,
1136 cache_miss,
1137 metadata_load_cost,
1138 } = self;
1139
1140 if self.is_empty() {
1141 return write!(f, "{{}}");
1142 }
1143 write!(f, "{{")?;
1144
1145 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1146
1147 if *mem_cache_hit > 0 {
1148 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1149 }
1150 if *file_cache_hit > 0 {
1151 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1152 }
1153 if *cache_miss > 0 {
1154 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1155 }
1156
1157 write!(f, "}}")
1158 }
1159}
1160
1161impl MetadataCacheMetrics {
1162 pub(crate) fn is_empty(&self) -> bool {
1164 self.metadata_load_cost.is_zero()
1165 }
1166
1167 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1169 self.mem_cache_hit += other.mem_cache_hit;
1170 self.file_cache_hit += other.file_cache_hit;
1171 self.cache_miss += other.cache_miss;
1172 self.metadata_load_cost += other.metadata_load_cost;
1173 }
1174}
1175
1176#[derive(Debug, Default, Clone)]
1178pub struct ReaderMetrics {
1179 pub(crate) filter_metrics: ReaderFilterMetrics,
1181 pub(crate) build_cost: Duration,
1183 pub(crate) scan_cost: Duration,
1185 pub(crate) num_record_batches: usize,
1187 pub(crate) num_batches: usize,
1189 pub(crate) num_rows: usize,
1191 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1193 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1195}
1196
1197impl ReaderMetrics {
1198 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1200 self.filter_metrics.merge_from(&other.filter_metrics);
1201 self.build_cost += other.build_cost;
1202 self.scan_cost += other.scan_cost;
1203 self.num_record_batches += other.num_record_batches;
1204 self.num_batches += other.num_batches;
1205 self.num_rows += other.num_rows;
1206 self.metadata_cache_metrics
1207 .merge_from(&other.metadata_cache_metrics);
1208 if let Some(other_fetch) = &other.fetch_metrics {
1209 if let Some(self_fetch) = &self.fetch_metrics {
1210 self_fetch.merge_from(other_fetch);
1211 } else {
1212 self.fetch_metrics = Some(other_fetch.clone());
1213 }
1214 }
1215 }
1216
1217 pub(crate) fn observe_rows(&self, read_type: &str) {
1219 READ_ROWS_TOTAL
1220 .with_label_values(&[read_type])
1221 .inc_by(self.num_rows as u64);
1222 }
1223}
1224
1225pub(crate) struct RowGroupReaderBuilder {
1227 file_handle: FileHandle,
1231 file_path: String,
1233 parquet_meta: Arc<ParquetMetaData>,
1235 object_store: ObjectStore,
1237 projection: ProjectionMask,
1239 field_levels: FieldLevels,
1241 cache_strategy: CacheStrategy,
1243}
1244
1245impl RowGroupReaderBuilder {
1246 pub(crate) fn file_path(&self) -> &str {
1248 &self.file_path
1249 }
1250
1251 pub(crate) fn file_handle(&self) -> &FileHandle {
1253 &self.file_handle
1254 }
1255
1256 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1257 &self.parquet_meta
1258 }
1259
1260 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1261 &self.cache_strategy
1262 }
1263
1264 pub(crate) async fn build(
1266 &self,
1267 row_group_idx: usize,
1268 row_selection: Option<RowSelection>,
1269 fetch_metrics: Option<&ParquetFetchMetrics>,
1270 ) -> Result<ParquetRecordBatchReader> {
1271 let fetch_start = Instant::now();
1272
1273 let mut row_group = InMemoryRowGroup::create(
1274 self.file_handle.region_id(),
1275 self.file_handle.file_id().file_id(),
1276 &self.parquet_meta,
1277 row_group_idx,
1278 self.cache_strategy.clone(),
1279 &self.file_path,
1280 self.object_store.clone(),
1281 );
1282 row_group
1284 .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1285 .await
1286 .context(ReadParquetSnafu {
1287 path: &self.file_path,
1288 })?;
1289
1290 if let Some(metrics) = fetch_metrics {
1292 metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1293 }
1294
1295 ParquetRecordBatchReader::try_new_with_row_groups(
1298 &self.field_levels,
1299 &row_group,
1300 DEFAULT_READ_BATCH_SIZE,
1301 row_selection,
1302 )
1303 .context(ReadParquetSnafu {
1304 path: &self.file_path,
1305 })
1306 }
1307}
1308
1309enum ReaderState {
1311 Readable(PruneReader),
1313 Exhausted(ReaderMetrics),
1315}
1316
1317impl ReaderState {
1318 fn metrics(&self) -> ReaderMetrics {
1320 match self {
1321 ReaderState::Readable(reader) => reader.metrics(),
1322 ReaderState::Exhausted(m) => m.clone(),
1323 }
1324 }
1325}
1326
1327pub(crate) enum MaybeFilter {
1329 Filter(SimpleFilterEvaluator),
1331 Matched,
1333 Pruned,
1335}
1336
1337pub(crate) struct SimpleFilterContext {
1339 filter: MaybeFilter,
1341 column_id: ColumnId,
1343 semantic_type: SemanticType,
1345 data_type: ConcreteDataType,
1347}
1348
1349impl SimpleFilterContext {
1350 pub(crate) fn new_opt(
1355 sst_meta: &RegionMetadataRef,
1356 expected_meta: Option<&RegionMetadata>,
1357 expr: &Expr,
1358 ) -> Option<Self> {
1359 let filter = SimpleFilterEvaluator::try_new(expr)?;
1360 let (column_metadata, maybe_filter) = match expected_meta {
1361 Some(meta) => {
1362 let column = meta.column_by_name(filter.column_name())?;
1364 match sst_meta.column_by_id(column.column_id) {
1367 Some(sst_column) => {
1368 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1369
1370 (column, MaybeFilter::Filter(filter))
1371 }
1372 None => {
1373 if pruned_by_default(&filter, column)? {
1377 (column, MaybeFilter::Pruned)
1378 } else {
1379 (column, MaybeFilter::Matched)
1380 }
1381 }
1382 }
1383 }
1384 None => {
1385 let column = sst_meta.column_by_name(filter.column_name())?;
1386 (column, MaybeFilter::Filter(filter))
1387 }
1388 };
1389
1390 Some(Self {
1391 filter: maybe_filter,
1392 column_id: column_metadata.column_id,
1393 semantic_type: column_metadata.semantic_type,
1394 data_type: column_metadata.column_schema.data_type.clone(),
1395 })
1396 }
1397
1398 pub(crate) fn filter(&self) -> &MaybeFilter {
1400 &self.filter
1401 }
1402
1403 pub(crate) fn column_id(&self) -> ColumnId {
1405 self.column_id
1406 }
1407
1408 pub(crate) fn semantic_type(&self) -> SemanticType {
1410 self.semantic_type
1411 }
1412
1413 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1415 &self.data_type
1416 }
1417}
1418
1419fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1422 let value = column.column_schema.create_default().ok().flatten()?;
1423 let scalar_value = value
1424 .try_to_scalar_value(&column.column_schema.data_type)
1425 .ok()?;
1426 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1427 Some(!matches)
1428}
1429
1430pub struct ParquetReader {
1432 context: FileRangeContextRef,
1434 selection: RowGroupSelection,
1436 reader_state: ReaderState,
1438 fetch_metrics: ParquetFetchMetrics,
1440}
1441
1442#[async_trait]
1443impl BatchReader for ParquetReader {
1444 #[tracing::instrument(
1445 skip_all,
1446 fields(
1447 region_id = %self.context.reader_builder().file_handle.region_id(),
1448 file_id = %self.context.reader_builder().file_handle.file_id()
1449 )
1450 )]
1451 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1452 let ReaderState::Readable(reader) = &mut self.reader_state else {
1453 return Ok(None);
1454 };
1455
1456 if let Some(batch) = reader.next_batch().await? {
1458 return Ok(Some(batch));
1459 }
1460
1461 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1463 let parquet_reader = self
1464 .context
1465 .reader_builder()
1466 .build(
1467 row_group_idx,
1468 Some(row_selection),
1469 Some(&self.fetch_metrics),
1470 )
1471 .await?;
1472
1473 let skip_fields = self.context.should_skip_fields(row_group_idx);
1476 reader.reset_source(
1477 Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1478 skip_fields,
1479 );
1480 if let Some(batch) = reader.next_batch().await? {
1481 return Ok(Some(batch));
1482 }
1483 }
1484
1485 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1487 Ok(None)
1488 }
1489}
1490
1491impl Drop for ParquetReader {
1492 fn drop(&mut self) {
1493 let metrics = self.reader_state.metrics();
1494 debug!(
1495 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1496 self.context.reader_builder().file_handle.region_id(),
1497 self.context.reader_builder().file_handle.file_id(),
1498 self.context.reader_builder().file_handle.time_range(),
1499 metrics.filter_metrics.rg_total
1500 - metrics.filter_metrics.rg_inverted_filtered
1501 - metrics.filter_metrics.rg_minmax_filtered
1502 - metrics.filter_metrics.rg_fulltext_filtered
1503 - metrics.filter_metrics.rg_bloom_filtered,
1504 metrics.filter_metrics.rg_total,
1505 metrics
1506 );
1507
1508 READ_STAGE_ELAPSED
1510 .with_label_values(&["build_parquet_reader"])
1511 .observe(metrics.build_cost.as_secs_f64());
1512 READ_STAGE_ELAPSED
1513 .with_label_values(&["scan_row_groups"])
1514 .observe(metrics.scan_cost.as_secs_f64());
1515 metrics.observe_rows("parquet_reader");
1516 metrics.filter_metrics.observe();
1517 }
1518}
1519
1520impl ParquetReader {
1521 #[tracing::instrument(
1523 skip_all,
1524 fields(
1525 region_id = %context.reader_builder().file_handle.region_id(),
1526 file_id = %context.reader_builder().file_handle.file_id()
1527 )
1528 )]
1529 pub(crate) async fn new(
1530 context: FileRangeContextRef,
1531 mut selection: RowGroupSelection,
1532 ) -> Result<Self> {
1533 let fetch_metrics = ParquetFetchMetrics::default();
1534 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1536 let parquet_reader = context
1537 .reader_builder()
1538 .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1539 .await?;
1540 let skip_fields = context.should_skip_fields(row_group_idx);
1542 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1543 context.clone(),
1544 RowGroupReader::new(context.clone(), parquet_reader),
1545 skip_fields,
1546 ))
1547 } else {
1548 ReaderState::Exhausted(ReaderMetrics::default())
1549 };
1550
1551 Ok(ParquetReader {
1552 context,
1553 selection,
1554 reader_state,
1555 fetch_metrics,
1556 })
1557 }
1558
1559 pub fn metadata(&self) -> &RegionMetadataRef {
1561 self.context.read_format().metadata()
1562 }
1563
1564 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1565 self.context.reader_builder().parquet_meta.clone()
1566 }
1567}
1568
1569pub(crate) trait RowGroupReaderContext: Send {
1572 fn map_result(
1573 &self,
1574 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1575 ) -> Result<Option<RecordBatch>>;
1576
1577 fn read_format(&self) -> &ReadFormat;
1578}
1579
1580impl RowGroupReaderContext for FileRangeContextRef {
1581 fn map_result(
1582 &self,
1583 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1584 ) -> Result<Option<RecordBatch>> {
1585 result.context(ArrowReaderSnafu {
1586 path: self.file_path(),
1587 })
1588 }
1589
1590 fn read_format(&self) -> &ReadFormat {
1591 self.as_ref().read_format()
1592 }
1593}
1594
1595pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1597
1598impl RowGroupReader {
1599 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1601 Self::create(context, reader)
1602 }
1603}
1604
1605pub(crate) struct RowGroupReaderBase<T> {
1607 context: T,
1609 reader: ParquetRecordBatchReader,
1611 batches: VecDeque<Batch>,
1613 metrics: ReaderMetrics,
1615 override_sequence: Option<ArrayRef>,
1617}
1618
1619impl<T> RowGroupReaderBase<T>
1620where
1621 T: RowGroupReaderContext,
1622{
1623 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1625 let override_sequence = context
1627 .read_format()
1628 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1629 assert!(context.read_format().as_primary_key().is_some());
1630
1631 Self {
1632 context,
1633 reader,
1634 batches: VecDeque::new(),
1635 metrics: ReaderMetrics::default(),
1636 override_sequence,
1637 }
1638 }
1639
1640 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1642 &self.metrics
1643 }
1644
1645 pub(crate) fn read_format(&self) -> &ReadFormat {
1647 self.context.read_format()
1648 }
1649
1650 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1652 self.context.map_result(self.reader.next().transpose())
1653 }
1654
1655 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1657 let scan_start = Instant::now();
1658 if let Some(batch) = self.batches.pop_front() {
1659 self.metrics.num_rows += batch.num_rows();
1660 self.metrics.scan_cost += scan_start.elapsed();
1661 return Ok(Some(batch));
1662 }
1663
1664 while self.batches.is_empty() {
1666 let Some(record_batch) = self.fetch_next_record_batch()? else {
1667 self.metrics.scan_cost += scan_start.elapsed();
1668 return Ok(None);
1669 };
1670 self.metrics.num_record_batches += 1;
1671
1672 self.context
1674 .read_format()
1675 .as_primary_key()
1676 .unwrap()
1677 .convert_record_batch(
1678 &record_batch,
1679 self.override_sequence.as_ref(),
1680 &mut self.batches,
1681 )?;
1682 self.metrics.num_batches += self.batches.len();
1683 }
1684 let batch = self.batches.pop_front();
1685 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1686 self.metrics.scan_cost += scan_start.elapsed();
1687 Ok(batch)
1688 }
1689}
1690
1691#[async_trait::async_trait]
1692impl<T> BatchReader for RowGroupReaderBase<T>
1693where
1694 T: RowGroupReaderContext,
1695{
1696 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1697 self.next_inner()
1698 }
1699}
1700
1701pub(crate) struct FlatRowGroupReader {
1703 context: FileRangeContextRef,
1705 reader: ParquetRecordBatchReader,
1707 override_sequence: Option<ArrayRef>,
1709}
1710
1711impl FlatRowGroupReader {
1712 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1714 let override_sequence = context
1716 .read_format()
1717 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1718
1719 Self {
1720 context,
1721 reader,
1722 override_sequence,
1723 }
1724 }
1725
1726 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1728 match self.reader.next() {
1729 Some(batch_result) => {
1730 let record_batch = batch_result.context(ArrowReaderSnafu {
1731 path: self.context.file_path(),
1732 })?;
1733
1734 let flat_format = self.context.read_format().as_flat().unwrap();
1736 let record_batch =
1737 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1738 Ok(Some(record_batch))
1739 }
1740 None => Ok(None),
1741 }
1742 }
1743}