1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use mito_codec::row_converter::build_primary_key_codec;
30use object_store::ObjectStore;
31use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
32use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use parquet::file::metadata::ParquetMetaData;
34use parquet::format::KeyValue;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
37use store_api::storage::ColumnId;
38use table::predicate::Predicate;
39
40use crate::cache::index::result_cache::PredicateKey;
41use crate::cache::CacheStrategy;
42use crate::error::{
43 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
44 ReadParquetSnafu, Result,
45};
46use crate::metrics::{
47 PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
48 READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
49};
50use crate::read::prune::{PruneReader, Source};
51use crate::read::{Batch, BatchReader};
52use crate::sst::file::{FileHandle, FileId};
53use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
54use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
55use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
56use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
57use crate::sst::parquet::format::ReadFormat;
58use crate::sst::parquet::metadata::MetadataLoader;
59use crate::sst::parquet::row_group::InMemoryRowGroup;
60use crate::sst::parquet::row_selection::RowGroupSelection;
61use crate::sst::parquet::stats::RowGroupPruningStats;
62use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
63
64const INDEX_TYPE_FULLTEXT: &str = "fulltext";
65const INDEX_TYPE_INVERTED: &str = "inverted";
66const INDEX_TYPE_BLOOM: &str = "bloom filter";
67
68macro_rules! handle_index_error {
69 ($err:expr, $file_handle:expr, $index_type:expr) => {
70 if cfg!(any(test, feature = "test")) {
71 panic!(
72 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
73 $index_type,
74 $file_handle.region_id(),
75 $file_handle.file_id(),
76 $err
77 );
78 } else {
79 warn!(
80 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
81 $index_type,
82 $file_handle.region_id(),
83 $file_handle.file_id()
84 );
85 }
86 };
87}
88
89pub struct ParquetReaderBuilder {
91 file_dir: String,
93 file_handle: FileHandle,
94 object_store: ObjectStore,
95 predicate: Option<Predicate>,
97 projection: Option<Vec<ColumnId>>,
102 cache_strategy: CacheStrategy,
104 inverted_index_applier: Option<InvertedIndexApplierRef>,
106 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
107 fulltext_index_applier: Option<FulltextIndexApplierRef>,
108 expected_metadata: Option<RegionMetadataRef>,
112}
113
114impl ParquetReaderBuilder {
115 pub fn new(
117 file_dir: String,
118 file_handle: FileHandle,
119 object_store: ObjectStore,
120 ) -> ParquetReaderBuilder {
121 ParquetReaderBuilder {
122 file_dir,
123 file_handle,
124 object_store,
125 predicate: None,
126 projection: None,
127 cache_strategy: CacheStrategy::Disabled,
128 inverted_index_applier: None,
129 bloom_filter_index_applier: None,
130 fulltext_index_applier: None,
131 expected_metadata: None,
132 }
133 }
134
135 #[must_use]
137 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
138 self.predicate = predicate;
139 self
140 }
141
142 #[must_use]
146 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
147 self.projection = projection;
148 self
149 }
150
151 #[must_use]
153 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
154 self.cache_strategy = cache;
155 self
156 }
157
158 #[must_use]
160 pub(crate) fn inverted_index_applier(
161 mut self,
162 index_applier: Option<InvertedIndexApplierRef>,
163 ) -> Self {
164 self.inverted_index_applier = index_applier;
165 self
166 }
167
168 #[must_use]
170 pub(crate) fn bloom_filter_index_applier(
171 mut self,
172 index_applier: Option<BloomFilterIndexApplierRef>,
173 ) -> Self {
174 self.bloom_filter_index_applier = index_applier;
175 self
176 }
177
178 #[must_use]
180 pub(crate) fn fulltext_index_applier(
181 mut self,
182 index_applier: Option<FulltextIndexApplierRef>,
183 ) -> Self {
184 self.fulltext_index_applier = index_applier;
185 self
186 }
187
188 #[must_use]
190 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
191 self.expected_metadata = expected_metadata;
192 self
193 }
194
195 pub async fn build(&self) -> Result<ParquetReader> {
199 let mut metrics = ReaderMetrics::default();
200
201 let (context, selection) = self.build_reader_input(&mut metrics).await?;
202 ParquetReader::new(Arc::new(context), selection).await
203 }
204
205 pub(crate) async fn build_reader_input(
209 &self,
210 metrics: &mut ReaderMetrics,
211 ) -> Result<(FileRangeContext, RowGroupSelection)> {
212 let start = Instant::now();
213
214 let file_path = self.file_handle.file_path(&self.file_dir);
215 let file_size = self.file_handle.meta_ref().file_size;
216
217 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
219 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
221 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
223 let read_format = if let Some(column_ids) = &self.projection {
224 ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
225 } else {
226 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
228 ReadFormat::new(
229 region_meta.clone(),
230 expected_meta
231 .column_metadatas
232 .iter()
233 .map(|col| col.column_id),
234 )
235 };
236
237 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
239 let indices = read_format.projection_indices();
240 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
243
244 let hint = Some(read_format.arrow_schema().fields());
246 let field_levels =
247 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
248 .context(ReadDataPartSnafu)?;
249 let selection = self
250 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
251 .await;
252
253 let reader_builder = RowGroupReaderBuilder {
254 file_handle: self.file_handle.clone(),
255 file_path,
256 parquet_meta,
257 object_store: self.object_store.clone(),
258 projection: projection_mask,
259 field_levels,
260 cache_strategy: self.cache_strategy.clone(),
261 };
262
263 let filters = if let Some(predicate) = &self.predicate {
264 predicate
265 .exprs()
266 .iter()
267 .filter_map(|expr| {
268 SimpleFilterContext::new_opt(
269 ®ion_meta,
270 self.expected_metadata.as_deref(),
271 expr,
272 )
273 })
274 .collect::<Vec<_>>()
275 } else {
276 vec![]
277 };
278
279 let codec = build_primary_key_codec(read_format.metadata());
280
281 let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
282
283 metrics.build_cost += start.elapsed();
284
285 Ok((context, selection))
286 }
287
288 fn get_region_metadata(
290 file_path: &str,
291 key_value_meta: Option<&Vec<KeyValue>>,
292 ) -> Result<RegionMetadata> {
293 let key_values = key_value_meta.context(InvalidParquetSnafu {
294 file: file_path,
295 reason: "missing key value meta",
296 })?;
297 let meta_value = key_values
298 .iter()
299 .find(|kv| kv.key == PARQUET_METADATA_KEY)
300 .with_context(|| InvalidParquetSnafu {
301 file: file_path,
302 reason: format!("key {} not found", PARQUET_METADATA_KEY),
303 })?;
304 let json = meta_value
305 .value
306 .as_ref()
307 .with_context(|| InvalidParquetSnafu {
308 file: file_path,
309 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
310 })?;
311
312 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
313 }
314
315 async fn read_parquet_metadata(
317 &self,
318 file_path: &str,
319 file_size: u64,
320 ) -> Result<Arc<ParquetMetaData>> {
321 let _t = READ_STAGE_ELAPSED
322 .with_label_values(&["read_parquet_metadata"])
323 .start_timer();
324
325 let region_id = self.file_handle.region_id();
326 let file_id = self.file_handle.file_id();
327 if let Some(metadata) = self
329 .cache_strategy
330 .get_parquet_meta_data(region_id, file_id)
331 .await
332 {
333 return Ok(metadata);
334 }
335
336 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
338 let metadata = metadata_loader.load().await?;
339 let metadata = Arc::new(metadata);
340 self.cache_strategy.put_parquet_meta_data(
342 self.file_handle.region_id(),
343 self.file_handle.file_id(),
344 metadata.clone(),
345 );
346
347 Ok(metadata)
348 }
349
350 async fn row_groups_to_read(
352 &self,
353 read_format: &ReadFormat,
354 parquet_meta: &ParquetMetaData,
355 metrics: &mut ReaderFilterMetrics,
356 ) -> RowGroupSelection {
357 let num_row_groups = parquet_meta.num_row_groups();
358 let num_rows = parquet_meta.file_metadata().num_rows();
359 if num_row_groups == 0 || num_rows == 0 {
360 return RowGroupSelection::default();
361 }
362
363 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
366 if row_group_size == 0 {
367 return RowGroupSelection::default();
368 }
369
370 metrics.rg_total += num_row_groups;
371 metrics.rows_total += num_rows as usize;
372
373 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
374
375 self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
376 if output.is_empty() {
377 return output;
378 }
379
380 let fulltext_filtered = self
381 .prune_row_groups_by_fulltext_index(
382 row_group_size,
383 num_row_groups,
384 &mut output,
385 metrics,
386 )
387 .await;
388 if output.is_empty() {
389 return output;
390 }
391
392 self.prune_row_groups_by_inverted_index(
393 row_group_size,
394 num_row_groups,
395 &mut output,
396 metrics,
397 )
398 .await;
399 if output.is_empty() {
400 return output;
401 }
402
403 self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
404 .await;
405 if output.is_empty() {
406 return output;
407 }
408
409 if !fulltext_filtered {
410 self.prune_row_groups_by_fulltext_bloom(
411 row_group_size,
412 parquet_meta,
413 &mut output,
414 metrics,
415 )
416 .await;
417 }
418 output
419 }
420
421 async fn prune_row_groups_by_fulltext_index(
423 &self,
424 row_group_size: usize,
425 num_row_groups: usize,
426 output: &mut RowGroupSelection,
427 metrics: &mut ReaderFilterMetrics,
428 ) -> bool {
429 let Some(index_applier) = &self.fulltext_index_applier else {
430 return false;
431 };
432 if !self.file_handle.meta_ref().fulltext_index_available() {
433 return false;
434 }
435
436 let predicate_key = index_applier.predicate_key();
437 let cached = self
439 .cache_strategy
440 .index_result_cache()
441 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
442 if let Some(result) = cached.as_ref() {
443 if all_required_row_groups_searched(output, result) {
444 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
445 return true;
446 }
447 }
448
449 let file_size_hint = self.file_handle.meta_ref().index_file_size();
451 let apply_res = index_applier
452 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
453 .await;
454 let selection = match apply_res {
455 Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
456 Ok(None) => return false,
457 Err(err) => {
458 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
459 return false;
460 }
461 };
462
463 self.apply_index_result_and_update_cache(
464 predicate_key,
465 self.file_handle.file_id(),
466 selection,
467 output,
468 metrics,
469 INDEX_TYPE_FULLTEXT,
470 );
471 true
472 }
473
474 async fn prune_row_groups_by_inverted_index(
480 &self,
481 row_group_size: usize,
482 num_row_groups: usize,
483 output: &mut RowGroupSelection,
484 metrics: &mut ReaderFilterMetrics,
485 ) -> bool {
486 let Some(index_applier) = &self.inverted_index_applier else {
487 return false;
488 };
489 if !self.file_handle.meta_ref().inverted_index_available() {
490 return false;
491 }
492
493 let predicate_key = index_applier.predicate_key();
494 let cached = self
496 .cache_strategy
497 .index_result_cache()
498 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
499 if let Some(result) = cached.as_ref() {
500 if all_required_row_groups_searched(output, result) {
501 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
502 return true;
503 }
504 }
505
506 let file_size_hint = self.file_handle.meta_ref().index_file_size();
508 let apply_res = index_applier
509 .apply(self.file_handle.file_id(), Some(file_size_hint))
510 .await;
511 let selection = match apply_res {
512 Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
513 row_group_size,
514 num_row_groups,
515 output,
516 ),
517 Err(err) => {
518 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
519 return false;
520 }
521 };
522
523 self.apply_index_result_and_update_cache(
524 predicate_key,
525 self.file_handle.file_id(),
526 selection,
527 output,
528 metrics,
529 INDEX_TYPE_INVERTED,
530 );
531 true
532 }
533
534 async fn prune_row_groups_by_bloom_filter(
535 &self,
536 row_group_size: usize,
537 parquet_meta: &ParquetMetaData,
538 output: &mut RowGroupSelection,
539 metrics: &mut ReaderFilterMetrics,
540 ) -> bool {
541 let Some(index_applier) = &self.bloom_filter_index_applier else {
542 return false;
543 };
544 if !self.file_handle.meta_ref().bloom_filter_index_available() {
545 return false;
546 }
547
548 let predicate_key = index_applier.predicate_key();
549 let cached = self
551 .cache_strategy
552 .index_result_cache()
553 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
554 if let Some(result) = cached.as_ref() {
555 if all_required_row_groups_searched(output, result) {
556 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
557 return true;
558 }
559 }
560
561 let file_size_hint = self.file_handle.meta_ref().index_file_size();
563 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
564 (
565 rg.num_rows() as usize,
566 output.contains_non_empty_row_group(i)
568 && cached
569 .as_ref()
570 .map(|c| !c.contains_row_group(i))
571 .unwrap_or(true),
572 )
573 });
574 let apply_res = index_applier
575 .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
576 .await;
577 let mut selection = match apply_res {
578 Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
579 Err(err) => {
580 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
581 return false;
582 }
583 };
584
585 if let Some(cached) = cached.as_ref() {
587 selection.concat(cached);
588 }
589
590 self.apply_index_result_and_update_cache(
591 predicate_key,
592 self.file_handle.file_id(),
593 selection,
594 output,
595 metrics,
596 INDEX_TYPE_BLOOM,
597 );
598 true
599 }
600
601 async fn prune_row_groups_by_fulltext_bloom(
602 &self,
603 row_group_size: usize,
604 parquet_meta: &ParquetMetaData,
605 output: &mut RowGroupSelection,
606 metrics: &mut ReaderFilterMetrics,
607 ) -> bool {
608 let Some(index_applier) = &self.fulltext_index_applier else {
609 return false;
610 };
611 if !self.file_handle.meta_ref().fulltext_index_available() {
612 return false;
613 }
614
615 let predicate_key = index_applier.predicate_key();
616 let cached = self
618 .cache_strategy
619 .index_result_cache()
620 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
621 if let Some(result) = cached.as_ref() {
622 if all_required_row_groups_searched(output, result) {
623 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
624 return true;
625 }
626 }
627
628 let file_size_hint = self.file_handle.meta_ref().index_file_size();
630 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
631 (
632 rg.num_rows() as usize,
633 output.contains_non_empty_row_group(i)
635 && cached
636 .as_ref()
637 .map(|c| !c.contains_row_group(i))
638 .unwrap_or(true),
639 )
640 });
641 let apply_res = index_applier
642 .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
643 .await;
644 let mut selection = match apply_res {
645 Ok(Some(apply_output)) => {
646 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
647 }
648 Ok(None) => return false,
649 Err(err) => {
650 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
651 return false;
652 }
653 };
654
655 if let Some(cached) = cached.as_ref() {
657 selection.concat(cached);
658 }
659
660 self.apply_index_result_and_update_cache(
661 predicate_key,
662 self.file_handle.file_id(),
663 selection,
664 output,
665 metrics,
666 INDEX_TYPE_FULLTEXT,
667 );
668 true
669 }
670
671 fn prune_row_groups_by_minmax(
673 &self,
674 read_format: &ReadFormat,
675 parquet_meta: &ParquetMetaData,
676 output: &mut RowGroupSelection,
677 metrics: &mut ReaderFilterMetrics,
678 ) -> bool {
679 let Some(predicate) = &self.predicate else {
680 return false;
681 };
682
683 let row_groups_before = output.row_group_count();
684
685 let region_meta = read_format.metadata();
686 let row_groups = parquet_meta.row_groups();
687 let stats =
688 RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
689 let prune_schema = self
690 .expected_metadata
691 .as_ref()
692 .map(|meta| meta.schema.arrow_schema())
693 .unwrap_or_else(|| region_meta.schema.arrow_schema());
694
695 predicate
699 .prune_with_stats(&stats, prune_schema)
700 .iter()
701 .zip(0..parquet_meta.num_row_groups())
702 .for_each(|(mask, row_group)| {
703 if !*mask {
704 output.remove_row_group(row_group);
705 }
706 });
707
708 let row_groups_after = output.row_group_count();
709 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
710
711 true
712 }
713
714 fn apply_index_result_and_update_cache(
715 &self,
716 predicate_key: &PredicateKey,
717 file_id: FileId,
718 result: RowGroupSelection,
719 output: &mut RowGroupSelection,
720 metrics: &mut ReaderFilterMetrics,
721 index_type: &str,
722 ) {
723 apply_selection_and_update_metrics(output, &result, metrics, index_type);
724
725 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
726 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
727 }
728 }
729}
730
731fn apply_selection_and_update_metrics(
732 output: &mut RowGroupSelection,
733 result: &RowGroupSelection,
734 metrics: &mut ReaderFilterMetrics,
735 index_type: &str,
736) {
737 let intersection = output.intersect(result);
738
739 let row_group_count = output.row_group_count() - intersection.row_group_count();
740 let row_count = output.row_count() - intersection.row_count();
741
742 metrics.update_index_metrics(index_type, row_group_count, row_count);
743
744 *output = intersection;
745}
746
747fn all_required_row_groups_searched(
748 required_row_groups: &RowGroupSelection,
749 cached_row_groups: &RowGroupSelection,
750) -> bool {
751 required_row_groups.iter().all(|(rg_id, _)| {
752 !required_row_groups.contains_non_empty_row_group(*rg_id)
754 || cached_row_groups.contains_row_group(*rg_id)
756 })
757}
758
759#[derive(Debug, Default, Clone, Copy)]
761pub(crate) struct ReaderFilterMetrics {
762 pub(crate) rg_total: usize,
764 pub(crate) rg_fulltext_filtered: usize,
766 pub(crate) rg_inverted_filtered: usize,
768 pub(crate) rg_minmax_filtered: usize,
770 pub(crate) rg_bloom_filtered: usize,
772
773 pub(crate) rows_total: usize,
775 pub(crate) rows_fulltext_filtered: usize,
777 pub(crate) rows_inverted_filtered: usize,
779 pub(crate) rows_bloom_filtered: usize,
781 pub(crate) rows_precise_filtered: usize,
783}
784
785impl ReaderFilterMetrics {
786 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
788 self.rg_total += other.rg_total;
789 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
790 self.rg_inverted_filtered += other.rg_inverted_filtered;
791 self.rg_minmax_filtered += other.rg_minmax_filtered;
792 self.rg_bloom_filtered += other.rg_bloom_filtered;
793
794 self.rows_total += other.rows_total;
795 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
796 self.rows_inverted_filtered += other.rows_inverted_filtered;
797 self.rows_bloom_filtered += other.rows_bloom_filtered;
798 self.rows_precise_filtered += other.rows_precise_filtered;
799 }
800
801 pub(crate) fn observe(&self) {
803 READ_ROW_GROUPS_TOTAL
804 .with_label_values(&["before_filtering"])
805 .inc_by(self.rg_total as u64);
806 READ_ROW_GROUPS_TOTAL
807 .with_label_values(&["fulltext_index_filtered"])
808 .inc_by(self.rg_fulltext_filtered as u64);
809 READ_ROW_GROUPS_TOTAL
810 .with_label_values(&["inverted_index_filtered"])
811 .inc_by(self.rg_inverted_filtered as u64);
812 READ_ROW_GROUPS_TOTAL
813 .with_label_values(&["minmax_index_filtered"])
814 .inc_by(self.rg_minmax_filtered as u64);
815 READ_ROW_GROUPS_TOTAL
816 .with_label_values(&["bloom_filter_index_filtered"])
817 .inc_by(self.rg_bloom_filtered as u64);
818
819 PRECISE_FILTER_ROWS_TOTAL
820 .with_label_values(&["parquet"])
821 .inc_by(self.rows_precise_filtered as u64);
822 READ_ROWS_IN_ROW_GROUP_TOTAL
823 .with_label_values(&["before_filtering"])
824 .inc_by(self.rows_total as u64);
825 READ_ROWS_IN_ROW_GROUP_TOTAL
826 .with_label_values(&["fulltext_index_filtered"])
827 .inc_by(self.rows_fulltext_filtered as u64);
828 READ_ROWS_IN_ROW_GROUP_TOTAL
829 .with_label_values(&["inverted_index_filtered"])
830 .inc_by(self.rows_inverted_filtered as u64);
831 READ_ROWS_IN_ROW_GROUP_TOTAL
832 .with_label_values(&["bloom_filter_index_filtered"])
833 .inc_by(self.rows_bloom_filtered as u64);
834 }
835
836 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
837 match index_type {
838 INDEX_TYPE_FULLTEXT => {
839 self.rg_fulltext_filtered += row_group_count;
840 self.rows_fulltext_filtered += row_count;
841 }
842 INDEX_TYPE_INVERTED => {
843 self.rg_inverted_filtered += row_group_count;
844 self.rows_inverted_filtered += row_count;
845 }
846 INDEX_TYPE_BLOOM => {
847 self.rg_bloom_filtered += row_group_count;
848 self.rows_bloom_filtered += row_count;
849 }
850 _ => {}
851 }
852 }
853}
854
855#[derive(Debug, Default, Clone)]
857pub struct ReaderMetrics {
858 pub(crate) filter_metrics: ReaderFilterMetrics,
860 pub(crate) build_cost: Duration,
862 pub(crate) scan_cost: Duration,
864 pub(crate) num_record_batches: usize,
866 pub(crate) num_batches: usize,
868 pub(crate) num_rows: usize,
870}
871
872impl ReaderMetrics {
873 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
875 self.filter_metrics.merge_from(&other.filter_metrics);
876 self.build_cost += other.build_cost;
877 self.scan_cost += other.scan_cost;
878 self.num_record_batches += other.num_record_batches;
879 self.num_batches += other.num_batches;
880 self.num_rows += other.num_rows;
881 }
882
883 pub(crate) fn observe_rows(&self, read_type: &str) {
885 READ_ROWS_TOTAL
886 .with_label_values(&[read_type])
887 .inc_by(self.num_rows as u64);
888 }
889}
890
891pub(crate) struct RowGroupReaderBuilder {
893 file_handle: FileHandle,
897 file_path: String,
899 parquet_meta: Arc<ParquetMetaData>,
901 object_store: ObjectStore,
903 projection: ProjectionMask,
905 field_levels: FieldLevels,
907 cache_strategy: CacheStrategy,
909}
910
911impl RowGroupReaderBuilder {
912 pub(crate) fn file_path(&self) -> &str {
914 &self.file_path
915 }
916
917 pub(crate) fn file_handle(&self) -> &FileHandle {
919 &self.file_handle
920 }
921
922 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
923 &self.parquet_meta
924 }
925
926 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
927 &self.cache_strategy
928 }
929
930 pub(crate) async fn build(
932 &self,
933 row_group_idx: usize,
934 row_selection: Option<RowSelection>,
935 ) -> Result<ParquetRecordBatchReader> {
936 let mut row_group = InMemoryRowGroup::create(
937 self.file_handle.region_id(),
938 self.file_handle.file_id(),
939 &self.parquet_meta,
940 row_group_idx,
941 self.cache_strategy.clone(),
942 &self.file_path,
943 self.object_store.clone(),
944 );
945 row_group
947 .fetch(&self.projection, row_selection.as_ref())
948 .await
949 .context(ReadParquetSnafu {
950 path: &self.file_path,
951 })?;
952
953 ParquetRecordBatchReader::try_new_with_row_groups(
956 &self.field_levels,
957 &row_group,
958 DEFAULT_READ_BATCH_SIZE,
959 row_selection,
960 )
961 .context(ReadParquetSnafu {
962 path: &self.file_path,
963 })
964 }
965}
966
967enum ReaderState {
969 Readable(PruneReader),
971 Exhausted(ReaderMetrics),
973}
974
975impl ReaderState {
976 fn metrics(&self) -> ReaderMetrics {
978 match self {
979 ReaderState::Readable(reader) => reader.metrics(),
980 ReaderState::Exhausted(m) => m.clone(),
981 }
982 }
983}
984
985pub(crate) enum MaybeFilter {
987 Filter(SimpleFilterEvaluator),
989 Matched,
991 Pruned,
993}
994
995pub(crate) struct SimpleFilterContext {
997 filter: MaybeFilter,
999 column_id: ColumnId,
1001 semantic_type: SemanticType,
1003 data_type: ConcreteDataType,
1005}
1006
1007impl SimpleFilterContext {
1008 pub(crate) fn new_opt(
1013 sst_meta: &RegionMetadataRef,
1014 expected_meta: Option<&RegionMetadata>,
1015 expr: &Expr,
1016 ) -> Option<Self> {
1017 let filter = SimpleFilterEvaluator::try_new(expr)?;
1018 let (column_metadata, maybe_filter) = match expected_meta {
1019 Some(meta) => {
1020 let column = meta.column_by_name(filter.column_name())?;
1022 match sst_meta.column_by_id(column.column_id) {
1025 Some(sst_column) => {
1026 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1027
1028 (column, MaybeFilter::Filter(filter))
1029 }
1030 None => {
1031 if pruned_by_default(&filter, column)? {
1035 (column, MaybeFilter::Pruned)
1036 } else {
1037 (column, MaybeFilter::Matched)
1038 }
1039 }
1040 }
1041 }
1042 None => {
1043 let column = sst_meta.column_by_name(filter.column_name())?;
1044 (column, MaybeFilter::Filter(filter))
1045 }
1046 };
1047
1048 Some(Self {
1049 filter: maybe_filter,
1050 column_id: column_metadata.column_id,
1051 semantic_type: column_metadata.semantic_type,
1052 data_type: column_metadata.column_schema.data_type.clone(),
1053 })
1054 }
1055
1056 pub(crate) fn filter(&self) -> &MaybeFilter {
1058 &self.filter
1059 }
1060
1061 pub(crate) fn column_id(&self) -> ColumnId {
1063 self.column_id
1064 }
1065
1066 pub(crate) fn semantic_type(&self) -> SemanticType {
1068 self.semantic_type
1069 }
1070
1071 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1073 &self.data_type
1074 }
1075}
1076
1077fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1080 let value = column.column_schema.create_default().ok().flatten()?;
1081 let scalar_value = value
1082 .try_to_scalar_value(&column.column_schema.data_type)
1083 .ok()?;
1084 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1085 Some(!matches)
1086}
1087
1088pub struct ParquetReader {
1090 context: FileRangeContextRef,
1092 selection: RowGroupSelection,
1094 reader_state: ReaderState,
1096}
1097
1098#[async_trait]
1099impl BatchReader for ParquetReader {
1100 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1101 let ReaderState::Readable(reader) = &mut self.reader_state else {
1102 return Ok(None);
1103 };
1104
1105 if let Some(batch) = reader.next_batch().await? {
1107 return Ok(Some(batch));
1108 }
1109
1110 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1112 let parquet_reader = self
1113 .context
1114 .reader_builder()
1115 .build(row_group_idx, Some(row_selection))
1116 .await?;
1117
1118 reader.reset_source(Source::RowGroup(RowGroupReader::new(
1120 self.context.clone(),
1121 parquet_reader,
1122 )));
1123 if let Some(batch) = reader.next_batch().await? {
1124 return Ok(Some(batch));
1125 }
1126 }
1127
1128 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1130 Ok(None)
1131 }
1132}
1133
1134impl Drop for ParquetReader {
1135 fn drop(&mut self) {
1136 let metrics = self.reader_state.metrics();
1137 debug!(
1138 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1139 self.context.reader_builder().file_handle.region_id(),
1140 self.context.reader_builder().file_handle.file_id(),
1141 self.context.reader_builder().file_handle.time_range(),
1142 metrics.filter_metrics.rg_total
1143 - metrics.filter_metrics.rg_inverted_filtered
1144 - metrics.filter_metrics.rg_minmax_filtered
1145 - metrics.filter_metrics.rg_fulltext_filtered
1146 - metrics.filter_metrics.rg_bloom_filtered,
1147 metrics.filter_metrics.rg_total,
1148 metrics
1149 );
1150
1151 READ_STAGE_ELAPSED
1153 .with_label_values(&["build_parquet_reader"])
1154 .observe(metrics.build_cost.as_secs_f64());
1155 READ_STAGE_ELAPSED
1156 .with_label_values(&["scan_row_groups"])
1157 .observe(metrics.scan_cost.as_secs_f64());
1158 metrics.observe_rows("parquet_reader");
1159 metrics.filter_metrics.observe();
1160 }
1161}
1162
1163impl ParquetReader {
1164 pub(crate) async fn new(
1166 context: FileRangeContextRef,
1167 mut selection: RowGroupSelection,
1168 ) -> Result<Self> {
1169 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1171 let parquet_reader = context
1172 .reader_builder()
1173 .build(row_group_idx, Some(row_selection))
1174 .await?;
1175 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1176 context.clone(),
1177 RowGroupReader::new(context.clone(), parquet_reader),
1178 ))
1179 } else {
1180 ReaderState::Exhausted(ReaderMetrics::default())
1181 };
1182
1183 Ok(ParquetReader {
1184 context,
1185 selection,
1186 reader_state,
1187 })
1188 }
1189
1190 pub fn metadata(&self) -> &RegionMetadataRef {
1192 self.context.read_format().metadata()
1193 }
1194
1195 #[cfg(test)]
1196 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1197 self.context.reader_builder().parquet_meta.clone()
1198 }
1199}
1200
1201pub(crate) trait RowGroupReaderContext: Send {
1204 fn map_result(
1205 &self,
1206 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1207 ) -> Result<Option<RecordBatch>>;
1208
1209 fn read_format(&self) -> &ReadFormat;
1210}
1211
1212impl RowGroupReaderContext for FileRangeContextRef {
1213 fn map_result(
1214 &self,
1215 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1216 ) -> Result<Option<RecordBatch>> {
1217 result.context(ArrowReaderSnafu {
1218 path: self.file_path(),
1219 })
1220 }
1221
1222 fn read_format(&self) -> &ReadFormat {
1223 self.as_ref().read_format()
1224 }
1225}
1226
1227pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1229
1230impl RowGroupReader {
1231 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1233 Self {
1234 context,
1235 reader,
1236 batches: VecDeque::new(),
1237 metrics: ReaderMetrics::default(),
1238 }
1239 }
1240}
1241
1242pub(crate) struct RowGroupReaderBase<T> {
1244 context: T,
1246 reader: ParquetRecordBatchReader,
1248 batches: VecDeque<Batch>,
1250 metrics: ReaderMetrics,
1252}
1253
1254impl<T> RowGroupReaderBase<T>
1255where
1256 T: RowGroupReaderContext,
1257{
1258 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1260 Self {
1261 context,
1262 reader,
1263 batches: VecDeque::new(),
1264 metrics: ReaderMetrics::default(),
1265 }
1266 }
1267
1268 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1270 &self.metrics
1271 }
1272
1273 pub(crate) fn read_format(&self) -> &ReadFormat {
1275 self.context.read_format()
1276 }
1277
1278 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1280 self.context.map_result(self.reader.next().transpose())
1281 }
1282
1283 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1285 let scan_start = Instant::now();
1286 if let Some(batch) = self.batches.pop_front() {
1287 self.metrics.num_rows += batch.num_rows();
1288 self.metrics.scan_cost += scan_start.elapsed();
1289 return Ok(Some(batch));
1290 }
1291
1292 while self.batches.is_empty() {
1294 let Some(record_batch) = self.fetch_next_record_batch()? else {
1295 self.metrics.scan_cost += scan_start.elapsed();
1296 return Ok(None);
1297 };
1298 self.metrics.num_record_batches += 1;
1299
1300 self.context
1301 .read_format()
1302 .convert_record_batch(&record_batch, &mut self.batches)?;
1303 self.metrics.num_batches += self.batches.len();
1304 }
1305 let batch = self.batches.pop_front();
1306 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1307 self.metrics.scan_cost += scan_start.elapsed();
1308 Ok(batch)
1309 }
1310}
1311
1312#[async_trait::async_trait]
1313impl<T> BatchReader for RowGroupReaderBase<T>
1314where
1315 T: RowGroupReaderContext,
1316{
1317 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1318 self.next_inner()
1319 }
1320}