1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46 ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::{
56 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
57};
58use crate::sst::index::fulltext_index::applier::{
59 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
60};
61use crate::sst::index::inverted_index::applier::{
62 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
63};
64use crate::sst::parquet::file_range::{
65 FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
66};
67use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
68use crate::sst::parquet::metadata::MetadataLoader;
69use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
70use crate::sst::parquet::row_selection::RowGroupSelection;
71use crate::sst::parquet::stats::RowGroupPruningStats;
72use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
73
74const INDEX_TYPE_FULLTEXT: &str = "fulltext";
75const INDEX_TYPE_INVERTED: &str = "inverted";
76const INDEX_TYPE_BLOOM: &str = "bloom filter";
77
78macro_rules! handle_index_error {
79 ($err:expr, $file_handle:expr, $index_type:expr) => {
80 if cfg!(any(test, feature = "test")) {
81 panic!(
82 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
83 $index_type,
84 $file_handle.region_id(),
85 $file_handle.file_id(),
86 $err
87 );
88 } else {
89 warn!(
90 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
91 $index_type,
92 $file_handle.region_id(),
93 $file_handle.file_id()
94 );
95 }
96 };
97}
98
99pub struct ParquetReaderBuilder {
101 table_dir: String,
103 path_type: PathType,
105 file_handle: FileHandle,
106 object_store: ObjectStore,
107 predicate: Option<Predicate>,
109 projection: Option<Vec<ColumnId>>,
114 cache_strategy: CacheStrategy,
116 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
118 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
119 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
120 expected_metadata: Option<RegionMetadataRef>,
124 flat_format: bool,
126 compaction: bool,
128 pre_filter_mode: PreFilterMode,
130 decode_primary_key_values: bool,
132}
133
134impl ParquetReaderBuilder {
135 pub fn new(
137 table_dir: String,
138 path_type: PathType,
139 file_handle: FileHandle,
140 object_store: ObjectStore,
141 ) -> ParquetReaderBuilder {
142 ParquetReaderBuilder {
143 table_dir,
144 path_type,
145 file_handle,
146 object_store,
147 predicate: None,
148 projection: None,
149 cache_strategy: CacheStrategy::Disabled,
150 inverted_index_appliers: [None, None],
151 bloom_filter_index_appliers: [None, None],
152 fulltext_index_appliers: [None, None],
153 expected_metadata: None,
154 flat_format: false,
155 compaction: false,
156 pre_filter_mode: PreFilterMode::All,
157 decode_primary_key_values: false,
158 }
159 }
160
161 #[must_use]
163 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
164 self.predicate = predicate;
165 self
166 }
167
168 #[must_use]
172 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
173 self.projection = projection;
174 self
175 }
176
177 #[must_use]
179 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
180 self.cache_strategy = cache;
181 self
182 }
183
184 #[must_use]
186 pub(crate) fn inverted_index_appliers(
187 mut self,
188 index_appliers: [Option<InvertedIndexApplierRef>; 2],
189 ) -> Self {
190 self.inverted_index_appliers = index_appliers;
191 self
192 }
193
194 #[must_use]
196 pub(crate) fn bloom_filter_index_appliers(
197 mut self,
198 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
199 ) -> Self {
200 self.bloom_filter_index_appliers = index_appliers;
201 self
202 }
203
204 #[must_use]
206 pub(crate) fn fulltext_index_appliers(
207 mut self,
208 index_appliers: [Option<FulltextIndexApplierRef>; 2],
209 ) -> Self {
210 self.fulltext_index_appliers = index_appliers;
211 self
212 }
213
214 #[must_use]
216 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
217 self.expected_metadata = expected_metadata;
218 self
219 }
220
221 #[must_use]
223 pub fn flat_format(mut self, flat_format: bool) -> Self {
224 self.flat_format = flat_format;
225 self
226 }
227
228 #[must_use]
230 pub fn compaction(mut self, compaction: bool) -> Self {
231 self.compaction = compaction;
232 self
233 }
234
235 #[must_use]
237 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
238 self.pre_filter_mode = pre_filter_mode;
239 self
240 }
241
242 #[must_use]
244 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
245 self.decode_primary_key_values = decode;
246 self
247 }
248
249 pub async fn build(&self) -> Result<ParquetReader> {
253 let mut metrics = ReaderMetrics::default();
254
255 let (context, selection) = self.build_reader_input(&mut metrics).await?;
256 ParquetReader::new(Arc::new(context), selection).await
257 }
258
259 pub(crate) async fn build_reader_input(
263 &self,
264 metrics: &mut ReaderMetrics,
265 ) -> Result<(FileRangeContext, RowGroupSelection)> {
266 let start = Instant::now();
267
268 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
269 let file_size = self.file_handle.meta_ref().file_size;
270
271 let parquet_meta = self
273 .read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
274 .await?;
275 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
277 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
279 let mut read_format = if let Some(column_ids) = &self.projection {
280 ReadFormat::new(
281 region_meta.clone(),
282 Some(column_ids),
283 self.flat_format,
284 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
285 &file_path,
286 self.compaction,
287 )?
288 } else {
289 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
291 let column_ids: Vec<_> = expected_meta
292 .column_metadatas
293 .iter()
294 .map(|col| col.column_id)
295 .collect();
296 ReadFormat::new(
297 region_meta.clone(),
298 Some(&column_ids),
299 self.flat_format,
300 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
301 &file_path,
302 self.compaction,
303 )?
304 };
305 if self.decode_primary_key_values {
306 read_format.set_decode_primary_key_values(true);
307 }
308 if need_override_sequence(&parquet_meta) {
309 read_format
310 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
311 }
312
313 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
315 let indices = read_format.projection_indices();
316 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
319
320 let hint = Some(read_format.arrow_schema().fields());
322 let field_levels =
323 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
324 .context(ReadDataPartSnafu)?;
325 let selection = self
326 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
327 .await;
328
329 let reader_builder = RowGroupReaderBuilder {
330 file_handle: self.file_handle.clone(),
331 file_path,
332 parquet_meta,
333 object_store: self.object_store.clone(),
334 projection: projection_mask,
335 field_levels,
336 cache_strategy: self.cache_strategy.clone(),
337 };
338
339 let filters = if let Some(predicate) = &self.predicate {
340 predicate
341 .exprs()
342 .iter()
343 .filter_map(|expr| {
344 SimpleFilterContext::new_opt(
345 ®ion_meta,
346 self.expected_metadata.as_deref(),
347 expr,
348 )
349 })
350 .collect::<Vec<_>>()
351 } else {
352 vec![]
353 };
354
355 let codec = build_primary_key_codec(read_format.metadata());
356
357 let context = FileRangeContext::new(
358 reader_builder,
359 filters,
360 read_format,
361 codec,
362 self.pre_filter_mode,
363 );
364
365 metrics.build_cost += start.elapsed();
366
367 Ok((context, selection))
368 }
369
370 fn get_region_metadata(
372 file_path: &str,
373 key_value_meta: Option<&Vec<KeyValue>>,
374 ) -> Result<RegionMetadata> {
375 let key_values = key_value_meta.context(InvalidParquetSnafu {
376 file: file_path,
377 reason: "missing key value meta",
378 })?;
379 let meta_value = key_values
380 .iter()
381 .find(|kv| kv.key == PARQUET_METADATA_KEY)
382 .with_context(|| InvalidParquetSnafu {
383 file: file_path,
384 reason: format!("key {} not found", PARQUET_METADATA_KEY),
385 })?;
386 let json = meta_value
387 .value
388 .as_ref()
389 .with_context(|| InvalidParquetSnafu {
390 file: file_path,
391 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
392 })?;
393
394 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
395 }
396
397 async fn read_parquet_metadata(
399 &self,
400 file_path: &str,
401 file_size: u64,
402 cache_metrics: &mut MetadataCacheMetrics,
403 ) -> Result<Arc<ParquetMetaData>> {
404 let start = Instant::now();
405 let _t = READ_STAGE_ELAPSED
406 .with_label_values(&["read_parquet_metadata"])
407 .start_timer();
408
409 let file_id = self.file_handle.file_id();
410 if let Some(metadata) = self
412 .cache_strategy
413 .get_parquet_meta_data(file_id, cache_metrics)
414 .await
415 {
416 cache_metrics.metadata_load_cost += start.elapsed();
417 return Ok(metadata);
418 }
419
420 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
422 let metadata = metadata_loader.load().await?;
423
424 let metadata = Arc::new(metadata);
425 self.cache_strategy
427 .put_parquet_meta_data(file_id, metadata.clone());
428
429 cache_metrics.metadata_load_cost += start.elapsed();
430 Ok(metadata)
431 }
432
433 async fn row_groups_to_read(
435 &self,
436 read_format: &ReadFormat,
437 parquet_meta: &ParquetMetaData,
438 metrics: &mut ReaderFilterMetrics,
439 ) -> RowGroupSelection {
440 let num_row_groups = parquet_meta.num_row_groups();
441 let num_rows = parquet_meta.file_metadata().num_rows();
442 if num_row_groups == 0 || num_rows == 0 {
443 return RowGroupSelection::default();
444 }
445
446 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
449 if row_group_size == 0 {
450 return RowGroupSelection::default();
451 }
452
453 metrics.rg_total += num_row_groups;
454 metrics.rows_total += num_rows as usize;
455
456 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
457
458 let skip_fields = self.compute_skip_fields(parquet_meta);
460
461 self.prune_row_groups_by_minmax(
462 read_format,
463 parquet_meta,
464 &mut output,
465 metrics,
466 skip_fields,
467 );
468 if output.is_empty() {
469 return output;
470 }
471
472 let fulltext_filtered = self
473 .prune_row_groups_by_fulltext_index(
474 row_group_size,
475 num_row_groups,
476 &mut output,
477 metrics,
478 skip_fields,
479 )
480 .await;
481 if output.is_empty() {
482 return output;
483 }
484
485 self.prune_row_groups_by_inverted_index(
486 row_group_size,
487 num_row_groups,
488 &mut output,
489 metrics,
490 skip_fields,
491 )
492 .await;
493 if output.is_empty() {
494 return output;
495 }
496
497 self.prune_row_groups_by_bloom_filter(
498 row_group_size,
499 parquet_meta,
500 &mut output,
501 metrics,
502 skip_fields,
503 )
504 .await;
505 if output.is_empty() {
506 return output;
507 }
508
509 if !fulltext_filtered {
510 self.prune_row_groups_by_fulltext_bloom(
511 row_group_size,
512 parquet_meta,
513 &mut output,
514 metrics,
515 skip_fields,
516 )
517 .await;
518 }
519 output
520 }
521
522 async fn prune_row_groups_by_fulltext_index(
524 &self,
525 row_group_size: usize,
526 num_row_groups: usize,
527 output: &mut RowGroupSelection,
528 metrics: &mut ReaderFilterMetrics,
529 skip_fields: bool,
530 ) -> bool {
531 if !self.file_handle.meta_ref().fulltext_index_available() {
532 return false;
533 }
534
535 let mut pruned = false;
536 let appliers = if skip_fields {
538 &self.fulltext_index_appliers[..1]
539 } else {
540 &self.fulltext_index_appliers[..]
541 };
542 for index_applier in appliers.iter().flatten() {
543 let predicate_key = index_applier.predicate_key();
544 let cached = self
546 .cache_strategy
547 .index_result_cache()
548 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
549 if let Some(result) = cached.as_ref()
550 && all_required_row_groups_searched(output, result)
551 {
552 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
553 pruned = true;
554 continue;
555 }
556
557 let file_size_hint = self.file_handle.meta_ref().index_file_size();
559 let apply_res = index_applier
560 .apply_fine(
561 self.file_handle.index_id(),
562 Some(file_size_hint),
563 metrics.fulltext_index_apply_metrics.as_mut(),
564 )
565 .await;
566 let selection = match apply_res {
567 Ok(Some(res)) => {
568 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
569 }
570 Ok(None) => continue,
571 Err(err) => {
572 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
573 continue;
574 }
575 };
576
577 self.apply_index_result_and_update_cache(
578 predicate_key,
579 self.file_handle.file_id().file_id(),
580 selection,
581 output,
582 metrics,
583 INDEX_TYPE_FULLTEXT,
584 );
585 pruned = true;
586 }
587 pruned
588 }
589
590 async fn prune_row_groups_by_inverted_index(
596 &self,
597 row_group_size: usize,
598 num_row_groups: usize,
599 output: &mut RowGroupSelection,
600 metrics: &mut ReaderFilterMetrics,
601 skip_fields: bool,
602 ) -> bool {
603 if !self.file_handle.meta_ref().inverted_index_available() {
604 return false;
605 }
606
607 let mut pruned = false;
608 let appliers = if skip_fields {
610 &self.inverted_index_appliers[..1]
611 } else {
612 &self.inverted_index_appliers[..]
613 };
614 for index_applier in appliers.iter().flatten() {
615 let predicate_key = index_applier.predicate_key();
616 let cached = self
618 .cache_strategy
619 .index_result_cache()
620 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
621 if let Some(result) = cached.as_ref()
622 && all_required_row_groups_searched(output, result)
623 {
624 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
625 pruned = true;
626 continue;
627 }
628
629 let file_size_hint = self.file_handle.meta_ref().index_file_size();
631 let apply_res = index_applier
632 .apply(
633 self.file_handle.index_id(),
634 Some(file_size_hint),
635 metrics.inverted_index_apply_metrics.as_mut(),
636 )
637 .await;
638 let selection = match apply_res {
639 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
640 row_group_size,
641 num_row_groups,
642 apply_output,
643 ),
644 Err(err) => {
645 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
646 continue;
647 }
648 };
649
650 self.apply_index_result_and_update_cache(
651 predicate_key,
652 self.file_handle.file_id().file_id(),
653 selection,
654 output,
655 metrics,
656 INDEX_TYPE_INVERTED,
657 );
658 pruned = true;
659 }
660 pruned
661 }
662
663 async fn prune_row_groups_by_bloom_filter(
664 &self,
665 row_group_size: usize,
666 parquet_meta: &ParquetMetaData,
667 output: &mut RowGroupSelection,
668 metrics: &mut ReaderFilterMetrics,
669 skip_fields: bool,
670 ) -> bool {
671 if !self.file_handle.meta_ref().bloom_filter_index_available() {
672 return false;
673 }
674
675 let mut pruned = false;
676 let appliers = if skip_fields {
678 &self.bloom_filter_index_appliers[..1]
679 } else {
680 &self.bloom_filter_index_appliers[..]
681 };
682 for index_applier in appliers.iter().flatten() {
683 let predicate_key = index_applier.predicate_key();
684 let cached = self
686 .cache_strategy
687 .index_result_cache()
688 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
689 if let Some(result) = cached.as_ref()
690 && all_required_row_groups_searched(output, result)
691 {
692 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
693 pruned = true;
694 continue;
695 }
696
697 let file_size_hint = self.file_handle.meta_ref().index_file_size();
699 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
700 (
701 rg.num_rows() as usize,
702 output.contains_non_empty_row_group(i)
704 && cached
705 .as_ref()
706 .map(|c| !c.contains_row_group(i))
707 .unwrap_or(true),
708 )
709 });
710 let apply_res = index_applier
711 .apply(
712 self.file_handle.index_id(),
713 Some(file_size_hint),
714 rgs,
715 metrics.bloom_filter_apply_metrics.as_mut(),
716 )
717 .await;
718 let mut selection = match apply_res {
719 Ok(apply_output) => {
720 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
721 }
722 Err(err) => {
723 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
724 continue;
725 }
726 };
727
728 if let Some(cached) = cached.as_ref() {
730 selection.concat(cached);
731 }
732
733 self.apply_index_result_and_update_cache(
734 predicate_key,
735 self.file_handle.file_id().file_id(),
736 selection,
737 output,
738 metrics,
739 INDEX_TYPE_BLOOM,
740 );
741 pruned = true;
742 }
743 pruned
744 }
745
746 async fn prune_row_groups_by_fulltext_bloom(
747 &self,
748 row_group_size: usize,
749 parquet_meta: &ParquetMetaData,
750 output: &mut RowGroupSelection,
751 metrics: &mut ReaderFilterMetrics,
752 skip_fields: bool,
753 ) -> bool {
754 if !self.file_handle.meta_ref().fulltext_index_available() {
755 return false;
756 }
757
758 let mut pruned = false;
759 let appliers = if skip_fields {
761 &self.fulltext_index_appliers[..1]
762 } else {
763 &self.fulltext_index_appliers[..]
764 };
765 for index_applier in appliers.iter().flatten() {
766 let predicate_key = index_applier.predicate_key();
767 let cached = self
769 .cache_strategy
770 .index_result_cache()
771 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
772 if let Some(result) = cached.as_ref()
773 && all_required_row_groups_searched(output, result)
774 {
775 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
776 pruned = true;
777 continue;
778 }
779
780 let file_size_hint = self.file_handle.meta_ref().index_file_size();
782 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
783 (
784 rg.num_rows() as usize,
785 output.contains_non_empty_row_group(i)
787 && cached
788 .as_ref()
789 .map(|c| !c.contains_row_group(i))
790 .unwrap_or(true),
791 )
792 });
793 let apply_res = index_applier
794 .apply_coarse(
795 self.file_handle.index_id(),
796 Some(file_size_hint),
797 rgs,
798 metrics.fulltext_index_apply_metrics.as_mut(),
799 )
800 .await;
801 let mut selection = match apply_res {
802 Ok(Some(apply_output)) => {
803 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
804 }
805 Ok(None) => continue,
806 Err(err) => {
807 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
808 continue;
809 }
810 };
811
812 if let Some(cached) = cached.as_ref() {
814 selection.concat(cached);
815 }
816
817 self.apply_index_result_and_update_cache(
818 predicate_key,
819 self.file_handle.file_id().file_id(),
820 selection,
821 output,
822 metrics,
823 INDEX_TYPE_FULLTEXT,
824 );
825 pruned = true;
826 }
827 pruned
828 }
829
830 fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
832 match self.pre_filter_mode {
833 PreFilterMode::All => false,
834 PreFilterMode::SkipFields => true,
835 PreFilterMode::SkipFieldsOnDelete => {
836 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
838 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
839 row_group_contains_delete(parquet_meta, rg_idx, &file_path)
840 .inspect_err(|e| {
841 warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
842 })
843 .unwrap_or(false)
844 })
845 }
846 }
847 }
848
849 fn prune_row_groups_by_minmax(
851 &self,
852 read_format: &ReadFormat,
853 parquet_meta: &ParquetMetaData,
854 output: &mut RowGroupSelection,
855 metrics: &mut ReaderFilterMetrics,
856 skip_fields: bool,
857 ) -> bool {
858 let Some(predicate) = &self.predicate else {
859 return false;
860 };
861
862 let row_groups_before = output.row_group_count();
863
864 let region_meta = read_format.metadata();
865 let row_groups = parquet_meta.row_groups();
866 let stats = RowGroupPruningStats::new(
867 row_groups,
868 read_format,
869 self.expected_metadata.clone(),
870 skip_fields,
871 );
872 let prune_schema = self
873 .expected_metadata
874 .as_ref()
875 .map(|meta| meta.schema.arrow_schema())
876 .unwrap_or_else(|| region_meta.schema.arrow_schema());
877
878 predicate
882 .prune_with_stats(&stats, prune_schema)
883 .iter()
884 .zip(0..parquet_meta.num_row_groups())
885 .for_each(|(mask, row_group)| {
886 if !*mask {
887 output.remove_row_group(row_group);
888 }
889 });
890
891 let row_groups_after = output.row_group_count();
892 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
893
894 true
895 }
896
897 fn apply_index_result_and_update_cache(
898 &self,
899 predicate_key: &PredicateKey,
900 file_id: FileId,
901 result: RowGroupSelection,
902 output: &mut RowGroupSelection,
903 metrics: &mut ReaderFilterMetrics,
904 index_type: &str,
905 ) {
906 apply_selection_and_update_metrics(output, &result, metrics, index_type);
907
908 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
909 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
910 }
911 }
912}
913
914fn apply_selection_and_update_metrics(
915 output: &mut RowGroupSelection,
916 result: &RowGroupSelection,
917 metrics: &mut ReaderFilterMetrics,
918 index_type: &str,
919) {
920 let intersection = output.intersect(result);
921
922 let row_group_count = output.row_group_count() - intersection.row_group_count();
923 let row_count = output.row_count() - intersection.row_count();
924
925 metrics.update_index_metrics(index_type, row_group_count, row_count);
926
927 *output = intersection;
928}
929
930fn all_required_row_groups_searched(
931 required_row_groups: &RowGroupSelection,
932 cached_row_groups: &RowGroupSelection,
933) -> bool {
934 required_row_groups.iter().all(|(rg_id, _)| {
935 !required_row_groups.contains_non_empty_row_group(*rg_id)
937 || cached_row_groups.contains_row_group(*rg_id)
939 })
940}
941
942#[derive(Debug, Default, Clone)]
944pub(crate) struct ReaderFilterMetrics {
945 pub(crate) rg_total: usize,
947 pub(crate) rg_fulltext_filtered: usize,
949 pub(crate) rg_inverted_filtered: usize,
951 pub(crate) rg_minmax_filtered: usize,
953 pub(crate) rg_bloom_filtered: usize,
955
956 pub(crate) rows_total: usize,
958 pub(crate) rows_fulltext_filtered: usize,
960 pub(crate) rows_inverted_filtered: usize,
962 pub(crate) rows_bloom_filtered: usize,
964 pub(crate) rows_precise_filtered: usize,
966
967 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
969 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
971 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
973}
974
975impl ReaderFilterMetrics {
976 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
978 self.rg_total += other.rg_total;
979 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
980 self.rg_inverted_filtered += other.rg_inverted_filtered;
981 self.rg_minmax_filtered += other.rg_minmax_filtered;
982 self.rg_bloom_filtered += other.rg_bloom_filtered;
983
984 self.rows_total += other.rows_total;
985 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
986 self.rows_inverted_filtered += other.rows_inverted_filtered;
987 self.rows_bloom_filtered += other.rows_bloom_filtered;
988 self.rows_precise_filtered += other.rows_precise_filtered;
989
990 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
992 self.inverted_index_apply_metrics
993 .get_or_insert_with(Default::default)
994 .merge_from(other_metrics);
995 }
996 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
997 self.bloom_filter_apply_metrics
998 .get_or_insert_with(Default::default)
999 .merge_from(other_metrics);
1000 }
1001 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1002 self.fulltext_index_apply_metrics
1003 .get_or_insert_with(Default::default)
1004 .merge_from(other_metrics);
1005 }
1006 }
1007
1008 pub(crate) fn observe(&self) {
1010 READ_ROW_GROUPS_TOTAL
1011 .with_label_values(&["before_filtering"])
1012 .inc_by(self.rg_total as u64);
1013 READ_ROW_GROUPS_TOTAL
1014 .with_label_values(&["fulltext_index_filtered"])
1015 .inc_by(self.rg_fulltext_filtered as u64);
1016 READ_ROW_GROUPS_TOTAL
1017 .with_label_values(&["inverted_index_filtered"])
1018 .inc_by(self.rg_inverted_filtered as u64);
1019 READ_ROW_GROUPS_TOTAL
1020 .with_label_values(&["minmax_index_filtered"])
1021 .inc_by(self.rg_minmax_filtered as u64);
1022 READ_ROW_GROUPS_TOTAL
1023 .with_label_values(&["bloom_filter_index_filtered"])
1024 .inc_by(self.rg_bloom_filtered as u64);
1025
1026 PRECISE_FILTER_ROWS_TOTAL
1027 .with_label_values(&["parquet"])
1028 .inc_by(self.rows_precise_filtered as u64);
1029 READ_ROWS_IN_ROW_GROUP_TOTAL
1030 .with_label_values(&["before_filtering"])
1031 .inc_by(self.rows_total as u64);
1032 READ_ROWS_IN_ROW_GROUP_TOTAL
1033 .with_label_values(&["fulltext_index_filtered"])
1034 .inc_by(self.rows_fulltext_filtered as u64);
1035 READ_ROWS_IN_ROW_GROUP_TOTAL
1036 .with_label_values(&["inverted_index_filtered"])
1037 .inc_by(self.rows_inverted_filtered as u64);
1038 READ_ROWS_IN_ROW_GROUP_TOTAL
1039 .with_label_values(&["bloom_filter_index_filtered"])
1040 .inc_by(self.rows_bloom_filtered as u64);
1041 }
1042
1043 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1044 match index_type {
1045 INDEX_TYPE_FULLTEXT => {
1046 self.rg_fulltext_filtered += row_group_count;
1047 self.rows_fulltext_filtered += row_count;
1048 }
1049 INDEX_TYPE_INVERTED => {
1050 self.rg_inverted_filtered += row_group_count;
1051 self.rows_inverted_filtered += row_count;
1052 }
1053 INDEX_TYPE_BLOOM => {
1054 self.rg_bloom_filtered += row_group_count;
1055 self.rows_bloom_filtered += row_count;
1056 }
1057 _ => {}
1058 }
1059 }
1060}
1061
1062#[derive(Default, Clone, Copy)]
1064pub(crate) struct MetadataCacheMetrics {
1065 pub(crate) mem_cache_hit: usize,
1067 pub(crate) file_cache_hit: usize,
1069 pub(crate) cache_miss: usize,
1071 pub(crate) metadata_load_cost: Duration,
1073}
1074
1075impl std::fmt::Debug for MetadataCacheMetrics {
1076 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1077 let Self {
1078 mem_cache_hit,
1079 file_cache_hit,
1080 cache_miss,
1081 metadata_load_cost,
1082 } = self;
1083
1084 if self.is_empty() {
1085 return write!(f, "{{}}");
1086 }
1087 write!(f, "{{")?;
1088
1089 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1090
1091 if *mem_cache_hit > 0 {
1092 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1093 }
1094 if *file_cache_hit > 0 {
1095 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1096 }
1097 if *cache_miss > 0 {
1098 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1099 }
1100
1101 write!(f, "}}")
1102 }
1103}
1104
1105impl MetadataCacheMetrics {
1106 pub(crate) fn is_empty(&self) -> bool {
1108 self.metadata_load_cost.is_zero()
1109 }
1110
1111 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1113 self.mem_cache_hit += other.mem_cache_hit;
1114 self.file_cache_hit += other.file_cache_hit;
1115 self.cache_miss += other.cache_miss;
1116 self.metadata_load_cost += other.metadata_load_cost;
1117 }
1118}
1119
1120#[derive(Debug, Default, Clone)]
1122pub struct ReaderMetrics {
1123 pub(crate) filter_metrics: ReaderFilterMetrics,
1125 pub(crate) build_cost: Duration,
1127 pub(crate) scan_cost: Duration,
1129 pub(crate) num_record_batches: usize,
1131 pub(crate) num_batches: usize,
1133 pub(crate) num_rows: usize,
1135 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1137 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1139}
1140
1141impl ReaderMetrics {
1142 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1144 self.filter_metrics.merge_from(&other.filter_metrics);
1145 self.build_cost += other.build_cost;
1146 self.scan_cost += other.scan_cost;
1147 self.num_record_batches += other.num_record_batches;
1148 self.num_batches += other.num_batches;
1149 self.num_rows += other.num_rows;
1150 self.metadata_cache_metrics
1151 .merge_from(&other.metadata_cache_metrics);
1152 if let Some(other_fetch) = &other.fetch_metrics {
1153 if let Some(self_fetch) = &self.fetch_metrics {
1154 self_fetch.merge_from(other_fetch);
1155 } else {
1156 self.fetch_metrics = Some(other_fetch.clone());
1157 }
1158 }
1159 }
1160
1161 pub(crate) fn observe_rows(&self, read_type: &str) {
1163 READ_ROWS_TOTAL
1164 .with_label_values(&[read_type])
1165 .inc_by(self.num_rows as u64);
1166 }
1167}
1168
1169pub(crate) struct RowGroupReaderBuilder {
1171 file_handle: FileHandle,
1175 file_path: String,
1177 parquet_meta: Arc<ParquetMetaData>,
1179 object_store: ObjectStore,
1181 projection: ProjectionMask,
1183 field_levels: FieldLevels,
1185 cache_strategy: CacheStrategy,
1187}
1188
1189impl RowGroupReaderBuilder {
1190 pub(crate) fn file_path(&self) -> &str {
1192 &self.file_path
1193 }
1194
1195 pub(crate) fn file_handle(&self) -> &FileHandle {
1197 &self.file_handle
1198 }
1199
1200 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1201 &self.parquet_meta
1202 }
1203
1204 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1205 &self.cache_strategy
1206 }
1207
1208 pub(crate) async fn build(
1210 &self,
1211 row_group_idx: usize,
1212 row_selection: Option<RowSelection>,
1213 fetch_metrics: Option<&ParquetFetchMetrics>,
1214 ) -> Result<ParquetRecordBatchReader> {
1215 let fetch_start = Instant::now();
1216
1217 let mut row_group = InMemoryRowGroup::create(
1218 self.file_handle.region_id(),
1219 self.file_handle.file_id().file_id(),
1220 &self.parquet_meta,
1221 row_group_idx,
1222 self.cache_strategy.clone(),
1223 &self.file_path,
1224 self.object_store.clone(),
1225 );
1226 row_group
1228 .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1229 .await
1230 .context(ReadParquetSnafu {
1231 path: &self.file_path,
1232 })?;
1233
1234 if let Some(metrics) = fetch_metrics {
1236 metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1237 }
1238
1239 ParquetRecordBatchReader::try_new_with_row_groups(
1242 &self.field_levels,
1243 &row_group,
1244 DEFAULT_READ_BATCH_SIZE,
1245 row_selection,
1246 )
1247 .context(ReadParquetSnafu {
1248 path: &self.file_path,
1249 })
1250 }
1251}
1252
1253enum ReaderState {
1255 Readable(PruneReader),
1257 Exhausted(ReaderMetrics),
1259}
1260
1261impl ReaderState {
1262 fn metrics(&self) -> ReaderMetrics {
1264 match self {
1265 ReaderState::Readable(reader) => reader.metrics(),
1266 ReaderState::Exhausted(m) => m.clone(),
1267 }
1268 }
1269}
1270
1271pub(crate) enum MaybeFilter {
1273 Filter(SimpleFilterEvaluator),
1275 Matched,
1277 Pruned,
1279}
1280
1281pub(crate) struct SimpleFilterContext {
1283 filter: MaybeFilter,
1285 column_id: ColumnId,
1287 semantic_type: SemanticType,
1289 data_type: ConcreteDataType,
1291}
1292
1293impl SimpleFilterContext {
1294 pub(crate) fn new_opt(
1299 sst_meta: &RegionMetadataRef,
1300 expected_meta: Option<&RegionMetadata>,
1301 expr: &Expr,
1302 ) -> Option<Self> {
1303 let filter = SimpleFilterEvaluator::try_new(expr)?;
1304 let (column_metadata, maybe_filter) = match expected_meta {
1305 Some(meta) => {
1306 let column = meta.column_by_name(filter.column_name())?;
1308 match sst_meta.column_by_id(column.column_id) {
1311 Some(sst_column) => {
1312 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1313
1314 (column, MaybeFilter::Filter(filter))
1315 }
1316 None => {
1317 if pruned_by_default(&filter, column)? {
1321 (column, MaybeFilter::Pruned)
1322 } else {
1323 (column, MaybeFilter::Matched)
1324 }
1325 }
1326 }
1327 }
1328 None => {
1329 let column = sst_meta.column_by_name(filter.column_name())?;
1330 (column, MaybeFilter::Filter(filter))
1331 }
1332 };
1333
1334 Some(Self {
1335 filter: maybe_filter,
1336 column_id: column_metadata.column_id,
1337 semantic_type: column_metadata.semantic_type,
1338 data_type: column_metadata.column_schema.data_type.clone(),
1339 })
1340 }
1341
1342 pub(crate) fn filter(&self) -> &MaybeFilter {
1344 &self.filter
1345 }
1346
1347 pub(crate) fn column_id(&self) -> ColumnId {
1349 self.column_id
1350 }
1351
1352 pub(crate) fn semantic_type(&self) -> SemanticType {
1354 self.semantic_type
1355 }
1356
1357 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1359 &self.data_type
1360 }
1361}
1362
1363fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1366 let value = column.column_schema.create_default().ok().flatten()?;
1367 let scalar_value = value
1368 .try_to_scalar_value(&column.column_schema.data_type)
1369 .ok()?;
1370 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1371 Some(!matches)
1372}
1373
1374pub struct ParquetReader {
1376 context: FileRangeContextRef,
1378 selection: RowGroupSelection,
1380 reader_state: ReaderState,
1382 fetch_metrics: ParquetFetchMetrics,
1384}
1385
1386#[async_trait]
1387impl BatchReader for ParquetReader {
1388 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1389 let ReaderState::Readable(reader) = &mut self.reader_state else {
1390 return Ok(None);
1391 };
1392
1393 if let Some(batch) = reader.next_batch().await? {
1395 return Ok(Some(batch));
1396 }
1397
1398 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1400 let parquet_reader = self
1401 .context
1402 .reader_builder()
1403 .build(
1404 row_group_idx,
1405 Some(row_selection),
1406 Some(&self.fetch_metrics),
1407 )
1408 .await?;
1409
1410 let skip_fields = self.context.should_skip_fields(row_group_idx);
1413 reader.reset_source(
1414 Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1415 skip_fields,
1416 );
1417 if let Some(batch) = reader.next_batch().await? {
1418 return Ok(Some(batch));
1419 }
1420 }
1421
1422 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1424 Ok(None)
1425 }
1426}
1427
1428impl Drop for ParquetReader {
1429 fn drop(&mut self) {
1430 let metrics = self.reader_state.metrics();
1431 debug!(
1432 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1433 self.context.reader_builder().file_handle.region_id(),
1434 self.context.reader_builder().file_handle.file_id(),
1435 self.context.reader_builder().file_handle.time_range(),
1436 metrics.filter_metrics.rg_total
1437 - metrics.filter_metrics.rg_inverted_filtered
1438 - metrics.filter_metrics.rg_minmax_filtered
1439 - metrics.filter_metrics.rg_fulltext_filtered
1440 - metrics.filter_metrics.rg_bloom_filtered,
1441 metrics.filter_metrics.rg_total,
1442 metrics
1443 );
1444
1445 READ_STAGE_ELAPSED
1447 .with_label_values(&["build_parquet_reader"])
1448 .observe(metrics.build_cost.as_secs_f64());
1449 READ_STAGE_ELAPSED
1450 .with_label_values(&["scan_row_groups"])
1451 .observe(metrics.scan_cost.as_secs_f64());
1452 metrics.observe_rows("parquet_reader");
1453 metrics.filter_metrics.observe();
1454 }
1455}
1456
1457impl ParquetReader {
1458 pub(crate) async fn new(
1460 context: FileRangeContextRef,
1461 mut selection: RowGroupSelection,
1462 ) -> Result<Self> {
1463 let fetch_metrics = ParquetFetchMetrics::default();
1464 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1466 let parquet_reader = context
1467 .reader_builder()
1468 .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1469 .await?;
1470 let skip_fields = context.should_skip_fields(row_group_idx);
1472 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1473 context.clone(),
1474 RowGroupReader::new(context.clone(), parquet_reader),
1475 skip_fields,
1476 ))
1477 } else {
1478 ReaderState::Exhausted(ReaderMetrics::default())
1479 };
1480
1481 Ok(ParquetReader {
1482 context,
1483 selection,
1484 reader_state,
1485 fetch_metrics,
1486 })
1487 }
1488
1489 pub fn metadata(&self) -> &RegionMetadataRef {
1491 self.context.read_format().metadata()
1492 }
1493
1494 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1495 self.context.reader_builder().parquet_meta.clone()
1496 }
1497}
1498
1499pub(crate) trait RowGroupReaderContext: Send {
1502 fn map_result(
1503 &self,
1504 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1505 ) -> Result<Option<RecordBatch>>;
1506
1507 fn read_format(&self) -> &ReadFormat;
1508}
1509
1510impl RowGroupReaderContext for FileRangeContextRef {
1511 fn map_result(
1512 &self,
1513 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1514 ) -> Result<Option<RecordBatch>> {
1515 result.context(ArrowReaderSnafu {
1516 path: self.file_path(),
1517 })
1518 }
1519
1520 fn read_format(&self) -> &ReadFormat {
1521 self.as_ref().read_format()
1522 }
1523}
1524
1525pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1527
1528impl RowGroupReader {
1529 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1531 Self::create(context, reader)
1532 }
1533}
1534
1535pub(crate) struct RowGroupReaderBase<T> {
1537 context: T,
1539 reader: ParquetRecordBatchReader,
1541 batches: VecDeque<Batch>,
1543 metrics: ReaderMetrics,
1545 override_sequence: Option<ArrayRef>,
1547}
1548
1549impl<T> RowGroupReaderBase<T>
1550where
1551 T: RowGroupReaderContext,
1552{
1553 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1555 let override_sequence = context
1557 .read_format()
1558 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1559 assert!(context.read_format().as_primary_key().is_some());
1560
1561 Self {
1562 context,
1563 reader,
1564 batches: VecDeque::new(),
1565 metrics: ReaderMetrics::default(),
1566 override_sequence,
1567 }
1568 }
1569
1570 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1572 &self.metrics
1573 }
1574
1575 pub(crate) fn read_format(&self) -> &ReadFormat {
1577 self.context.read_format()
1578 }
1579
1580 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1582 self.context.map_result(self.reader.next().transpose())
1583 }
1584
1585 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1587 let scan_start = Instant::now();
1588 if let Some(batch) = self.batches.pop_front() {
1589 self.metrics.num_rows += batch.num_rows();
1590 self.metrics.scan_cost += scan_start.elapsed();
1591 return Ok(Some(batch));
1592 }
1593
1594 while self.batches.is_empty() {
1596 let Some(record_batch) = self.fetch_next_record_batch()? else {
1597 self.metrics.scan_cost += scan_start.elapsed();
1598 return Ok(None);
1599 };
1600 self.metrics.num_record_batches += 1;
1601
1602 self.context
1604 .read_format()
1605 .as_primary_key()
1606 .unwrap()
1607 .convert_record_batch(
1608 &record_batch,
1609 self.override_sequence.as_ref(),
1610 &mut self.batches,
1611 )?;
1612 self.metrics.num_batches += self.batches.len();
1613 }
1614 let batch = self.batches.pop_front();
1615 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1616 self.metrics.scan_cost += scan_start.elapsed();
1617 Ok(batch)
1618 }
1619}
1620
1621#[async_trait::async_trait]
1622impl<T> BatchReader for RowGroupReaderBase<T>
1623where
1624 T: RowGroupReaderContext,
1625{
1626 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1627 self.next_inner()
1628 }
1629}
1630
1631pub(crate) struct FlatRowGroupReader {
1633 context: FileRangeContextRef,
1635 reader: ParquetRecordBatchReader,
1637 override_sequence: Option<ArrayRef>,
1639}
1640
1641impl FlatRowGroupReader {
1642 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1644 let override_sequence = context
1646 .read_format()
1647 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1648
1649 Self {
1650 context,
1651 reader,
1652 override_sequence,
1653 }
1654 }
1655
1656 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1658 match self.reader.next() {
1659 Some(batch_result) => {
1660 let record_batch = batch_result.context(ArrowReaderSnafu {
1661 path: self.context.file_path(),
1662 })?;
1663
1664 let flat_format = self.context.read_format().as_flat().unwrap();
1666 let record_batch =
1667 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1668 Ok(Some(record_batch))
1669 }
1670 None => Ok(None),
1671 }
1672 }
1673}