1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46 ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{
59 FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
60};
61use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
62use crate::sst::parquet::metadata::MetadataLoader;
63use crate::sst::parquet::row_group::InMemoryRowGroup;
64use crate::sst::parquet::row_selection::RowGroupSelection;
65use crate::sst::parquet::stats::RowGroupPruningStats;
66use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
67
68const INDEX_TYPE_FULLTEXT: &str = "fulltext";
69const INDEX_TYPE_INVERTED: &str = "inverted";
70const INDEX_TYPE_BLOOM: &str = "bloom filter";
71
72macro_rules! handle_index_error {
73 ($err:expr, $file_handle:expr, $index_type:expr) => {
74 if cfg!(any(test, feature = "test")) {
75 panic!(
76 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
77 $index_type,
78 $file_handle.region_id(),
79 $file_handle.file_id(),
80 $err
81 );
82 } else {
83 warn!(
84 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
85 $index_type,
86 $file_handle.region_id(),
87 $file_handle.file_id()
88 );
89 }
90 };
91}
92
93pub struct ParquetReaderBuilder {
95 table_dir: String,
97 path_type: PathType,
99 file_handle: FileHandle,
100 object_store: ObjectStore,
101 predicate: Option<Predicate>,
103 projection: Option<Vec<ColumnId>>,
108 cache_strategy: CacheStrategy,
110 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
112 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
113 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
114 expected_metadata: Option<RegionMetadataRef>,
118 flat_format: bool,
120 compaction: bool,
122 pre_filter_mode: PreFilterMode,
124}
125
126impl ParquetReaderBuilder {
127 pub fn new(
129 table_dir: String,
130 path_type: PathType,
131 file_handle: FileHandle,
132 object_store: ObjectStore,
133 ) -> ParquetReaderBuilder {
134 ParquetReaderBuilder {
135 table_dir,
136 path_type,
137 file_handle,
138 object_store,
139 predicate: None,
140 projection: None,
141 cache_strategy: CacheStrategy::Disabled,
142 inverted_index_appliers: [None, None],
143 bloom_filter_index_appliers: [None, None],
144 fulltext_index_appliers: [None, None],
145 expected_metadata: None,
146 flat_format: false,
147 compaction: false,
148 pre_filter_mode: PreFilterMode::All,
149 }
150 }
151
152 #[must_use]
154 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
155 self.predicate = predicate;
156 self
157 }
158
159 #[must_use]
163 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
164 self.projection = projection;
165 self
166 }
167
168 #[must_use]
170 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
171 self.cache_strategy = cache;
172 self
173 }
174
175 #[must_use]
177 pub(crate) fn inverted_index_appliers(
178 mut self,
179 index_appliers: [Option<InvertedIndexApplierRef>; 2],
180 ) -> Self {
181 self.inverted_index_appliers = index_appliers;
182 self
183 }
184
185 #[must_use]
187 pub(crate) fn bloom_filter_index_appliers(
188 mut self,
189 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
190 ) -> Self {
191 self.bloom_filter_index_appliers = index_appliers;
192 self
193 }
194
195 #[must_use]
197 pub(crate) fn fulltext_index_appliers(
198 mut self,
199 index_appliers: [Option<FulltextIndexApplierRef>; 2],
200 ) -> Self {
201 self.fulltext_index_appliers = index_appliers;
202 self
203 }
204
205 #[must_use]
207 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
208 self.expected_metadata = expected_metadata;
209 self
210 }
211
212 #[must_use]
214 pub fn flat_format(mut self, flat_format: bool) -> Self {
215 self.flat_format = flat_format;
216 self
217 }
218
219 #[must_use]
221 pub fn compaction(mut self, compaction: bool) -> Self {
222 self.compaction = compaction;
223 self
224 }
225
226 #[must_use]
228 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
229 self.pre_filter_mode = pre_filter_mode;
230 self
231 }
232
233 pub async fn build(&self) -> Result<ParquetReader> {
237 let mut metrics = ReaderMetrics::default();
238
239 let (context, selection) = self.build_reader_input(&mut metrics).await?;
240 ParquetReader::new(Arc::new(context), selection).await
241 }
242
243 pub(crate) async fn build_reader_input(
247 &self,
248 metrics: &mut ReaderMetrics,
249 ) -> Result<(FileRangeContext, RowGroupSelection)> {
250 let start = Instant::now();
251
252 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
253 let file_size = self.file_handle.meta_ref().file_size;
254
255 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
257 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
259 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
261 let mut read_format = if let Some(column_ids) = &self.projection {
262 ReadFormat::new(
263 region_meta.clone(),
264 Some(column_ids),
265 self.flat_format,
266 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
267 &file_path,
268 self.compaction,
269 )?
270 } else {
271 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
273 let column_ids: Vec<_> = expected_meta
274 .column_metadatas
275 .iter()
276 .map(|col| col.column_id)
277 .collect();
278 ReadFormat::new(
279 region_meta.clone(),
280 Some(&column_ids),
281 self.flat_format,
282 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
283 &file_path,
284 self.compaction,
285 )?
286 };
287 if need_override_sequence(&parquet_meta) {
288 read_format
289 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
290 }
291
292 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
294 let indices = read_format.projection_indices();
295 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
298
299 let hint = Some(read_format.arrow_schema().fields());
301 let field_levels =
302 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
303 .context(ReadDataPartSnafu)?;
304 let selection = self
305 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
306 .await;
307
308 let reader_builder = RowGroupReaderBuilder {
309 file_handle: self.file_handle.clone(),
310 file_path,
311 parquet_meta,
312 object_store: self.object_store.clone(),
313 projection: projection_mask,
314 field_levels,
315 cache_strategy: self.cache_strategy.clone(),
316 };
317
318 let filters = if let Some(predicate) = &self.predicate {
319 predicate
320 .exprs()
321 .iter()
322 .filter_map(|expr| {
323 SimpleFilterContext::new_opt(
324 ®ion_meta,
325 self.expected_metadata.as_deref(),
326 expr,
327 )
328 })
329 .collect::<Vec<_>>()
330 } else {
331 vec![]
332 };
333
334 let codec = build_primary_key_codec(read_format.metadata());
335
336 let context = FileRangeContext::new(
337 reader_builder,
338 filters,
339 read_format,
340 codec,
341 self.pre_filter_mode,
342 );
343
344 metrics.build_cost += start.elapsed();
345
346 Ok((context, selection))
347 }
348
349 fn get_region_metadata(
351 file_path: &str,
352 key_value_meta: Option<&Vec<KeyValue>>,
353 ) -> Result<RegionMetadata> {
354 let key_values = key_value_meta.context(InvalidParquetSnafu {
355 file: file_path,
356 reason: "missing key value meta",
357 })?;
358 let meta_value = key_values
359 .iter()
360 .find(|kv| kv.key == PARQUET_METADATA_KEY)
361 .with_context(|| InvalidParquetSnafu {
362 file: file_path,
363 reason: format!("key {} not found", PARQUET_METADATA_KEY),
364 })?;
365 let json = meta_value
366 .value
367 .as_ref()
368 .with_context(|| InvalidParquetSnafu {
369 file: file_path,
370 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
371 })?;
372
373 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
374 }
375
376 async fn read_parquet_metadata(
378 &self,
379 file_path: &str,
380 file_size: u64,
381 ) -> Result<Arc<ParquetMetaData>> {
382 let _t = READ_STAGE_ELAPSED
383 .with_label_values(&["read_parquet_metadata"])
384 .start_timer();
385
386 let file_id = self.file_handle.file_id();
387 if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
389 return Ok(metadata);
390 }
391
392 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
394 let metadata = metadata_loader.load().await?;
395 let metadata = Arc::new(metadata);
396 self.cache_strategy
398 .put_parquet_meta_data(file_id, metadata.clone());
399
400 Ok(metadata)
401 }
402
403 async fn row_groups_to_read(
405 &self,
406 read_format: &ReadFormat,
407 parquet_meta: &ParquetMetaData,
408 metrics: &mut ReaderFilterMetrics,
409 ) -> RowGroupSelection {
410 let num_row_groups = parquet_meta.num_row_groups();
411 let num_rows = parquet_meta.file_metadata().num_rows();
412 if num_row_groups == 0 || num_rows == 0 {
413 return RowGroupSelection::default();
414 }
415
416 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
419 if row_group_size == 0 {
420 return RowGroupSelection::default();
421 }
422
423 metrics.rg_total += num_row_groups;
424 metrics.rows_total += num_rows as usize;
425
426 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
427
428 let skip_fields = self.compute_skip_fields(parquet_meta);
430
431 self.prune_row_groups_by_minmax(
432 read_format,
433 parquet_meta,
434 &mut output,
435 metrics,
436 skip_fields,
437 );
438 if output.is_empty() {
439 return output;
440 }
441
442 let fulltext_filtered = self
443 .prune_row_groups_by_fulltext_index(
444 row_group_size,
445 num_row_groups,
446 &mut output,
447 metrics,
448 skip_fields,
449 )
450 .await;
451 if output.is_empty() {
452 return output;
453 }
454
455 self.prune_row_groups_by_inverted_index(
456 row_group_size,
457 num_row_groups,
458 &mut output,
459 metrics,
460 skip_fields,
461 )
462 .await;
463 if output.is_empty() {
464 return output;
465 }
466
467 self.prune_row_groups_by_bloom_filter(
468 row_group_size,
469 parquet_meta,
470 &mut output,
471 metrics,
472 skip_fields,
473 )
474 .await;
475 if output.is_empty() {
476 return output;
477 }
478
479 if !fulltext_filtered {
480 self.prune_row_groups_by_fulltext_bloom(
481 row_group_size,
482 parquet_meta,
483 &mut output,
484 metrics,
485 skip_fields,
486 )
487 .await;
488 }
489 output
490 }
491
492 async fn prune_row_groups_by_fulltext_index(
494 &self,
495 row_group_size: usize,
496 num_row_groups: usize,
497 output: &mut RowGroupSelection,
498 metrics: &mut ReaderFilterMetrics,
499 skip_fields: bool,
500 ) -> bool {
501 if !self.file_handle.meta_ref().fulltext_index_available() {
502 return false;
503 }
504
505 let mut pruned = false;
506 let appliers = if skip_fields {
508 &self.fulltext_index_appliers[..1]
509 } else {
510 &self.fulltext_index_appliers[..]
511 };
512 for index_applier in appliers.iter().flatten() {
513 let predicate_key = index_applier.predicate_key();
514 let cached = self
516 .cache_strategy
517 .index_result_cache()
518 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
519 if let Some(result) = cached.as_ref()
520 && all_required_row_groups_searched(output, result)
521 {
522 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
523 pruned = true;
524 continue;
525 }
526
527 let file_size_hint = self.file_handle.meta_ref().index_file_size();
529 let apply_res = index_applier
530 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
531 .await;
532 let selection = match apply_res {
533 Ok(Some(res)) => {
534 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
535 }
536 Ok(None) => continue,
537 Err(err) => {
538 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
539 continue;
540 }
541 };
542
543 self.apply_index_result_and_update_cache(
544 predicate_key,
545 self.file_handle.file_id().file_id(),
546 selection,
547 output,
548 metrics,
549 INDEX_TYPE_FULLTEXT,
550 );
551 pruned = true;
552 }
553 pruned
554 }
555
556 async fn prune_row_groups_by_inverted_index(
562 &self,
563 row_group_size: usize,
564 num_row_groups: usize,
565 output: &mut RowGroupSelection,
566 metrics: &mut ReaderFilterMetrics,
567 skip_fields: bool,
568 ) -> bool {
569 if !self.file_handle.meta_ref().inverted_index_available() {
570 return false;
571 }
572
573 let mut pruned = false;
574 let appliers = if skip_fields {
576 &self.inverted_index_appliers[..1]
577 } else {
578 &self.inverted_index_appliers[..]
579 };
580 for index_applier in appliers.iter().flatten() {
581 let predicate_key = index_applier.predicate_key();
582 let cached = self
584 .cache_strategy
585 .index_result_cache()
586 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
587 if let Some(result) = cached.as_ref()
588 && all_required_row_groups_searched(output, result)
589 {
590 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
591 pruned = true;
592 continue;
593 }
594
595 let file_size_hint = self.file_handle.meta_ref().index_file_size();
597 let apply_res = index_applier
598 .apply(self.file_handle.file_id(), Some(file_size_hint))
599 .await;
600 let selection = match apply_res {
601 Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
602 row_group_size,
603 num_row_groups,
604 output,
605 ),
606 Err(err) => {
607 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
608 continue;
609 }
610 };
611
612 self.apply_index_result_and_update_cache(
613 predicate_key,
614 self.file_handle.file_id().file_id(),
615 selection,
616 output,
617 metrics,
618 INDEX_TYPE_INVERTED,
619 );
620 pruned = true;
621 }
622 pruned
623 }
624
625 async fn prune_row_groups_by_bloom_filter(
626 &self,
627 row_group_size: usize,
628 parquet_meta: &ParquetMetaData,
629 output: &mut RowGroupSelection,
630 metrics: &mut ReaderFilterMetrics,
631 skip_fields: bool,
632 ) -> bool {
633 if !self.file_handle.meta_ref().bloom_filter_index_available() {
634 return false;
635 }
636
637 let mut pruned = false;
638 let appliers = if skip_fields {
640 &self.bloom_filter_index_appliers[..1]
641 } else {
642 &self.bloom_filter_index_appliers[..]
643 };
644 for index_applier in appliers.iter().flatten() {
645 let predicate_key = index_applier.predicate_key();
646 let cached = self
648 .cache_strategy
649 .index_result_cache()
650 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
651 if let Some(result) = cached.as_ref()
652 && all_required_row_groups_searched(output, result)
653 {
654 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
655 pruned = true;
656 continue;
657 }
658
659 let file_size_hint = self.file_handle.meta_ref().index_file_size();
661 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
662 (
663 rg.num_rows() as usize,
664 output.contains_non_empty_row_group(i)
666 && cached
667 .as_ref()
668 .map(|c| !c.contains_row_group(i))
669 .unwrap_or(true),
670 )
671 });
672 let apply_res = index_applier
673 .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
674 .await;
675 let mut selection = match apply_res {
676 Ok(apply_output) => {
677 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
678 }
679 Err(err) => {
680 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
681 continue;
682 }
683 };
684
685 if let Some(cached) = cached.as_ref() {
687 selection.concat(cached);
688 }
689
690 self.apply_index_result_and_update_cache(
691 predicate_key,
692 self.file_handle.file_id().file_id(),
693 selection,
694 output,
695 metrics,
696 INDEX_TYPE_BLOOM,
697 );
698 pruned = true;
699 }
700 pruned
701 }
702
703 async fn prune_row_groups_by_fulltext_bloom(
704 &self,
705 row_group_size: usize,
706 parquet_meta: &ParquetMetaData,
707 output: &mut RowGroupSelection,
708 metrics: &mut ReaderFilterMetrics,
709 skip_fields: bool,
710 ) -> bool {
711 if !self.file_handle.meta_ref().fulltext_index_available() {
712 return false;
713 }
714
715 let mut pruned = false;
716 let appliers = if skip_fields {
718 &self.fulltext_index_appliers[..1]
719 } else {
720 &self.fulltext_index_appliers[..]
721 };
722 for index_applier in appliers.iter().flatten() {
723 let predicate_key = index_applier.predicate_key();
724 let cached = self
726 .cache_strategy
727 .index_result_cache()
728 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
729 if let Some(result) = cached.as_ref()
730 && all_required_row_groups_searched(output, result)
731 {
732 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
733 pruned = true;
734 continue;
735 }
736
737 let file_size_hint = self.file_handle.meta_ref().index_file_size();
739 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
740 (
741 rg.num_rows() as usize,
742 output.contains_non_empty_row_group(i)
744 && cached
745 .as_ref()
746 .map(|c| !c.contains_row_group(i))
747 .unwrap_or(true),
748 )
749 });
750 let apply_res = index_applier
751 .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
752 .await;
753 let mut selection = match apply_res {
754 Ok(Some(apply_output)) => {
755 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
756 }
757 Ok(None) => continue,
758 Err(err) => {
759 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
760 continue;
761 }
762 };
763
764 if let Some(cached) = cached.as_ref() {
766 selection.concat(cached);
767 }
768
769 self.apply_index_result_and_update_cache(
770 predicate_key,
771 self.file_handle.file_id().file_id(),
772 selection,
773 output,
774 metrics,
775 INDEX_TYPE_FULLTEXT,
776 );
777 pruned = true;
778 }
779 pruned
780 }
781
782 fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
784 match self.pre_filter_mode {
785 PreFilterMode::All => false,
786 PreFilterMode::SkipFields => true,
787 PreFilterMode::SkipFieldsOnDelete => {
788 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
790 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
791 row_group_contains_delete(parquet_meta, rg_idx, &file_path)
792 .inspect_err(|e| {
793 warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
794 })
795 .unwrap_or(false)
796 })
797 }
798 }
799 }
800
801 fn prune_row_groups_by_minmax(
803 &self,
804 read_format: &ReadFormat,
805 parquet_meta: &ParquetMetaData,
806 output: &mut RowGroupSelection,
807 metrics: &mut ReaderFilterMetrics,
808 skip_fields: bool,
809 ) -> bool {
810 let Some(predicate) = &self.predicate else {
811 return false;
812 };
813
814 let row_groups_before = output.row_group_count();
815
816 let region_meta = read_format.metadata();
817 let row_groups = parquet_meta.row_groups();
818 let stats = RowGroupPruningStats::new(
819 row_groups,
820 read_format,
821 self.expected_metadata.clone(),
822 skip_fields,
823 );
824 let prune_schema = self
825 .expected_metadata
826 .as_ref()
827 .map(|meta| meta.schema.arrow_schema())
828 .unwrap_or_else(|| region_meta.schema.arrow_schema());
829
830 predicate
834 .prune_with_stats(&stats, prune_schema)
835 .iter()
836 .zip(0..parquet_meta.num_row_groups())
837 .for_each(|(mask, row_group)| {
838 if !*mask {
839 output.remove_row_group(row_group);
840 }
841 });
842
843 let row_groups_after = output.row_group_count();
844 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
845
846 true
847 }
848
849 fn apply_index_result_and_update_cache(
850 &self,
851 predicate_key: &PredicateKey,
852 file_id: FileId,
853 result: RowGroupSelection,
854 output: &mut RowGroupSelection,
855 metrics: &mut ReaderFilterMetrics,
856 index_type: &str,
857 ) {
858 apply_selection_and_update_metrics(output, &result, metrics, index_type);
859
860 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
861 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
862 }
863 }
864}
865
866fn apply_selection_and_update_metrics(
867 output: &mut RowGroupSelection,
868 result: &RowGroupSelection,
869 metrics: &mut ReaderFilterMetrics,
870 index_type: &str,
871) {
872 let intersection = output.intersect(result);
873
874 let row_group_count = output.row_group_count() - intersection.row_group_count();
875 let row_count = output.row_count() - intersection.row_count();
876
877 metrics.update_index_metrics(index_type, row_group_count, row_count);
878
879 *output = intersection;
880}
881
882fn all_required_row_groups_searched(
883 required_row_groups: &RowGroupSelection,
884 cached_row_groups: &RowGroupSelection,
885) -> bool {
886 required_row_groups.iter().all(|(rg_id, _)| {
887 !required_row_groups.contains_non_empty_row_group(*rg_id)
889 || cached_row_groups.contains_row_group(*rg_id)
891 })
892}
893
894#[derive(Debug, Default, Clone, Copy)]
896pub(crate) struct ReaderFilterMetrics {
897 pub(crate) rg_total: usize,
899 pub(crate) rg_fulltext_filtered: usize,
901 pub(crate) rg_inverted_filtered: usize,
903 pub(crate) rg_minmax_filtered: usize,
905 pub(crate) rg_bloom_filtered: usize,
907
908 pub(crate) rows_total: usize,
910 pub(crate) rows_fulltext_filtered: usize,
912 pub(crate) rows_inverted_filtered: usize,
914 pub(crate) rows_bloom_filtered: usize,
916 pub(crate) rows_precise_filtered: usize,
918}
919
920impl ReaderFilterMetrics {
921 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
923 self.rg_total += other.rg_total;
924 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
925 self.rg_inverted_filtered += other.rg_inverted_filtered;
926 self.rg_minmax_filtered += other.rg_minmax_filtered;
927 self.rg_bloom_filtered += other.rg_bloom_filtered;
928
929 self.rows_total += other.rows_total;
930 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
931 self.rows_inverted_filtered += other.rows_inverted_filtered;
932 self.rows_bloom_filtered += other.rows_bloom_filtered;
933 self.rows_precise_filtered += other.rows_precise_filtered;
934 }
935
936 pub(crate) fn observe(&self) {
938 READ_ROW_GROUPS_TOTAL
939 .with_label_values(&["before_filtering"])
940 .inc_by(self.rg_total as u64);
941 READ_ROW_GROUPS_TOTAL
942 .with_label_values(&["fulltext_index_filtered"])
943 .inc_by(self.rg_fulltext_filtered as u64);
944 READ_ROW_GROUPS_TOTAL
945 .with_label_values(&["inverted_index_filtered"])
946 .inc_by(self.rg_inverted_filtered as u64);
947 READ_ROW_GROUPS_TOTAL
948 .with_label_values(&["minmax_index_filtered"])
949 .inc_by(self.rg_minmax_filtered as u64);
950 READ_ROW_GROUPS_TOTAL
951 .with_label_values(&["bloom_filter_index_filtered"])
952 .inc_by(self.rg_bloom_filtered as u64);
953
954 PRECISE_FILTER_ROWS_TOTAL
955 .with_label_values(&["parquet"])
956 .inc_by(self.rows_precise_filtered as u64);
957 READ_ROWS_IN_ROW_GROUP_TOTAL
958 .with_label_values(&["before_filtering"])
959 .inc_by(self.rows_total as u64);
960 READ_ROWS_IN_ROW_GROUP_TOTAL
961 .with_label_values(&["fulltext_index_filtered"])
962 .inc_by(self.rows_fulltext_filtered as u64);
963 READ_ROWS_IN_ROW_GROUP_TOTAL
964 .with_label_values(&["inverted_index_filtered"])
965 .inc_by(self.rows_inverted_filtered as u64);
966 READ_ROWS_IN_ROW_GROUP_TOTAL
967 .with_label_values(&["bloom_filter_index_filtered"])
968 .inc_by(self.rows_bloom_filtered as u64);
969 }
970
971 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
972 match index_type {
973 INDEX_TYPE_FULLTEXT => {
974 self.rg_fulltext_filtered += row_group_count;
975 self.rows_fulltext_filtered += row_count;
976 }
977 INDEX_TYPE_INVERTED => {
978 self.rg_inverted_filtered += row_group_count;
979 self.rows_inverted_filtered += row_count;
980 }
981 INDEX_TYPE_BLOOM => {
982 self.rg_bloom_filtered += row_group_count;
983 self.rows_bloom_filtered += row_count;
984 }
985 _ => {}
986 }
987 }
988}
989
990#[derive(Debug, Default, Clone)]
992pub struct ReaderMetrics {
993 pub(crate) filter_metrics: ReaderFilterMetrics,
995 pub(crate) build_cost: Duration,
997 pub(crate) scan_cost: Duration,
999 pub(crate) num_record_batches: usize,
1001 pub(crate) num_batches: usize,
1003 pub(crate) num_rows: usize,
1005}
1006
1007impl ReaderMetrics {
1008 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1010 self.filter_metrics.merge_from(&other.filter_metrics);
1011 self.build_cost += other.build_cost;
1012 self.scan_cost += other.scan_cost;
1013 self.num_record_batches += other.num_record_batches;
1014 self.num_batches += other.num_batches;
1015 self.num_rows += other.num_rows;
1016 }
1017
1018 pub(crate) fn observe_rows(&self, read_type: &str) {
1020 READ_ROWS_TOTAL
1021 .with_label_values(&[read_type])
1022 .inc_by(self.num_rows as u64);
1023 }
1024}
1025
1026pub(crate) struct RowGroupReaderBuilder {
1028 file_handle: FileHandle,
1032 file_path: String,
1034 parquet_meta: Arc<ParquetMetaData>,
1036 object_store: ObjectStore,
1038 projection: ProjectionMask,
1040 field_levels: FieldLevels,
1042 cache_strategy: CacheStrategy,
1044}
1045
1046impl RowGroupReaderBuilder {
1047 pub(crate) fn file_path(&self) -> &str {
1049 &self.file_path
1050 }
1051
1052 pub(crate) fn file_handle(&self) -> &FileHandle {
1054 &self.file_handle
1055 }
1056
1057 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1058 &self.parquet_meta
1059 }
1060
1061 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1062 &self.cache_strategy
1063 }
1064
1065 pub(crate) async fn build(
1067 &self,
1068 row_group_idx: usize,
1069 row_selection: Option<RowSelection>,
1070 ) -> Result<ParquetRecordBatchReader> {
1071 let mut row_group = InMemoryRowGroup::create(
1072 self.file_handle.region_id(),
1073 self.file_handle.file_id().file_id(),
1074 &self.parquet_meta,
1075 row_group_idx,
1076 self.cache_strategy.clone(),
1077 &self.file_path,
1078 self.object_store.clone(),
1079 );
1080 row_group
1082 .fetch(&self.projection, row_selection.as_ref())
1083 .await
1084 .context(ReadParquetSnafu {
1085 path: &self.file_path,
1086 })?;
1087
1088 ParquetRecordBatchReader::try_new_with_row_groups(
1091 &self.field_levels,
1092 &row_group,
1093 DEFAULT_READ_BATCH_SIZE,
1094 row_selection,
1095 )
1096 .context(ReadParquetSnafu {
1097 path: &self.file_path,
1098 })
1099 }
1100}
1101
1102enum ReaderState {
1104 Readable(PruneReader),
1106 Exhausted(ReaderMetrics),
1108}
1109
1110impl ReaderState {
1111 fn metrics(&self) -> ReaderMetrics {
1113 match self {
1114 ReaderState::Readable(reader) => reader.metrics(),
1115 ReaderState::Exhausted(m) => m.clone(),
1116 }
1117 }
1118}
1119
1120pub(crate) enum MaybeFilter {
1122 Filter(SimpleFilterEvaluator),
1124 Matched,
1126 Pruned,
1128}
1129
1130pub(crate) struct SimpleFilterContext {
1132 filter: MaybeFilter,
1134 column_id: ColumnId,
1136 semantic_type: SemanticType,
1138 data_type: ConcreteDataType,
1140}
1141
1142impl SimpleFilterContext {
1143 pub(crate) fn new_opt(
1148 sst_meta: &RegionMetadataRef,
1149 expected_meta: Option<&RegionMetadata>,
1150 expr: &Expr,
1151 ) -> Option<Self> {
1152 let filter = SimpleFilterEvaluator::try_new(expr)?;
1153 let (column_metadata, maybe_filter) = match expected_meta {
1154 Some(meta) => {
1155 let column = meta.column_by_name(filter.column_name())?;
1157 match sst_meta.column_by_id(column.column_id) {
1160 Some(sst_column) => {
1161 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1162
1163 (column, MaybeFilter::Filter(filter))
1164 }
1165 None => {
1166 if pruned_by_default(&filter, column)? {
1170 (column, MaybeFilter::Pruned)
1171 } else {
1172 (column, MaybeFilter::Matched)
1173 }
1174 }
1175 }
1176 }
1177 None => {
1178 let column = sst_meta.column_by_name(filter.column_name())?;
1179 (column, MaybeFilter::Filter(filter))
1180 }
1181 };
1182
1183 Some(Self {
1184 filter: maybe_filter,
1185 column_id: column_metadata.column_id,
1186 semantic_type: column_metadata.semantic_type,
1187 data_type: column_metadata.column_schema.data_type.clone(),
1188 })
1189 }
1190
1191 pub(crate) fn filter(&self) -> &MaybeFilter {
1193 &self.filter
1194 }
1195
1196 pub(crate) fn column_id(&self) -> ColumnId {
1198 self.column_id
1199 }
1200
1201 pub(crate) fn semantic_type(&self) -> SemanticType {
1203 self.semantic_type
1204 }
1205
1206 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1208 &self.data_type
1209 }
1210}
1211
1212fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1215 let value = column.column_schema.create_default().ok().flatten()?;
1216 let scalar_value = value
1217 .try_to_scalar_value(&column.column_schema.data_type)
1218 .ok()?;
1219 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1220 Some(!matches)
1221}
1222
1223pub struct ParquetReader {
1225 context: FileRangeContextRef,
1227 selection: RowGroupSelection,
1229 reader_state: ReaderState,
1231}
1232
1233#[async_trait]
1234impl BatchReader for ParquetReader {
1235 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1236 let ReaderState::Readable(reader) = &mut self.reader_state else {
1237 return Ok(None);
1238 };
1239
1240 if let Some(batch) = reader.next_batch().await? {
1242 return Ok(Some(batch));
1243 }
1244
1245 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1247 let parquet_reader = self
1248 .context
1249 .reader_builder()
1250 .build(row_group_idx, Some(row_selection))
1251 .await?;
1252
1253 let skip_fields = self.context.should_skip_fields(row_group_idx);
1256 reader.reset_source(
1257 Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1258 skip_fields,
1259 );
1260 if let Some(batch) = reader.next_batch().await? {
1261 return Ok(Some(batch));
1262 }
1263 }
1264
1265 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1267 Ok(None)
1268 }
1269}
1270
1271impl Drop for ParquetReader {
1272 fn drop(&mut self) {
1273 let metrics = self.reader_state.metrics();
1274 debug!(
1275 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1276 self.context.reader_builder().file_handle.region_id(),
1277 self.context.reader_builder().file_handle.file_id(),
1278 self.context.reader_builder().file_handle.time_range(),
1279 metrics.filter_metrics.rg_total
1280 - metrics.filter_metrics.rg_inverted_filtered
1281 - metrics.filter_metrics.rg_minmax_filtered
1282 - metrics.filter_metrics.rg_fulltext_filtered
1283 - metrics.filter_metrics.rg_bloom_filtered,
1284 metrics.filter_metrics.rg_total,
1285 metrics
1286 );
1287
1288 READ_STAGE_ELAPSED
1290 .with_label_values(&["build_parquet_reader"])
1291 .observe(metrics.build_cost.as_secs_f64());
1292 READ_STAGE_ELAPSED
1293 .with_label_values(&["scan_row_groups"])
1294 .observe(metrics.scan_cost.as_secs_f64());
1295 metrics.observe_rows("parquet_reader");
1296 metrics.filter_metrics.observe();
1297 }
1298}
1299
1300impl ParquetReader {
1301 pub(crate) async fn new(
1303 context: FileRangeContextRef,
1304 mut selection: RowGroupSelection,
1305 ) -> Result<Self> {
1306 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1308 let parquet_reader = context
1309 .reader_builder()
1310 .build(row_group_idx, Some(row_selection))
1311 .await?;
1312 let skip_fields = context.should_skip_fields(row_group_idx);
1314 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1315 context.clone(),
1316 RowGroupReader::new(context.clone(), parquet_reader),
1317 skip_fields,
1318 ))
1319 } else {
1320 ReaderState::Exhausted(ReaderMetrics::default())
1321 };
1322
1323 Ok(ParquetReader {
1324 context,
1325 selection,
1326 reader_state,
1327 })
1328 }
1329
1330 pub fn metadata(&self) -> &RegionMetadataRef {
1332 self.context.read_format().metadata()
1333 }
1334
1335 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1336 self.context.reader_builder().parquet_meta.clone()
1337 }
1338}
1339
1340pub(crate) trait RowGroupReaderContext: Send {
1343 fn map_result(
1344 &self,
1345 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1346 ) -> Result<Option<RecordBatch>>;
1347
1348 fn read_format(&self) -> &ReadFormat;
1349}
1350
1351impl RowGroupReaderContext for FileRangeContextRef {
1352 fn map_result(
1353 &self,
1354 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1355 ) -> Result<Option<RecordBatch>> {
1356 result.context(ArrowReaderSnafu {
1357 path: self.file_path(),
1358 })
1359 }
1360
1361 fn read_format(&self) -> &ReadFormat {
1362 self.as_ref().read_format()
1363 }
1364}
1365
1366pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1368
1369impl RowGroupReader {
1370 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1372 Self::create(context, reader)
1373 }
1374}
1375
1376pub(crate) struct RowGroupReaderBase<T> {
1378 context: T,
1380 reader: ParquetRecordBatchReader,
1382 batches: VecDeque<Batch>,
1384 metrics: ReaderMetrics,
1386 override_sequence: Option<ArrayRef>,
1388}
1389
1390impl<T> RowGroupReaderBase<T>
1391where
1392 T: RowGroupReaderContext,
1393{
1394 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1396 let override_sequence = context
1398 .read_format()
1399 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1400 assert!(context.read_format().as_primary_key().is_some());
1401
1402 Self {
1403 context,
1404 reader,
1405 batches: VecDeque::new(),
1406 metrics: ReaderMetrics::default(),
1407 override_sequence,
1408 }
1409 }
1410
1411 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1413 &self.metrics
1414 }
1415
1416 pub(crate) fn read_format(&self) -> &ReadFormat {
1418 self.context.read_format()
1419 }
1420
1421 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1423 self.context.map_result(self.reader.next().transpose())
1424 }
1425
1426 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1428 let scan_start = Instant::now();
1429 if let Some(batch) = self.batches.pop_front() {
1430 self.metrics.num_rows += batch.num_rows();
1431 self.metrics.scan_cost += scan_start.elapsed();
1432 return Ok(Some(batch));
1433 }
1434
1435 while self.batches.is_empty() {
1437 let Some(record_batch) = self.fetch_next_record_batch()? else {
1438 self.metrics.scan_cost += scan_start.elapsed();
1439 return Ok(None);
1440 };
1441 self.metrics.num_record_batches += 1;
1442
1443 self.context
1445 .read_format()
1446 .as_primary_key()
1447 .unwrap()
1448 .convert_record_batch(
1449 &record_batch,
1450 self.override_sequence.as_ref(),
1451 &mut self.batches,
1452 )?;
1453 self.metrics.num_batches += self.batches.len();
1454 }
1455 let batch = self.batches.pop_front();
1456 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1457 self.metrics.scan_cost += scan_start.elapsed();
1458 Ok(batch)
1459 }
1460}
1461
1462#[async_trait::async_trait]
1463impl<T> BatchReader for RowGroupReaderBase<T>
1464where
1465 T: RowGroupReaderContext,
1466{
1467 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1468 self.next_inner()
1469 }
1470}
1471
1472pub(crate) struct FlatRowGroupReader {
1474 context: FileRangeContextRef,
1476 reader: ParquetRecordBatchReader,
1478 override_sequence: Option<ArrayRef>,
1480}
1481
1482impl FlatRowGroupReader {
1483 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1485 let override_sequence = context
1487 .read_format()
1488 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1489
1490 Self {
1491 context,
1492 reader,
1493 override_sequence,
1494 }
1495 }
1496
1497 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1499 match self.reader.next() {
1500 Some(batch_result) => {
1501 let record_batch = batch_result.context(ArrowReaderSnafu {
1502 path: self.context.file_path(),
1503 })?;
1504
1505 let flat_format = self.context.read_format().as_flat().unwrap();
1507 let record_batch =
1508 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1509 Ok(Some(record_batch))
1510 }
1511 None => Ok(None),
1512 }
1513 }
1514}