1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use object_store::ObjectStore;
30use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
31use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
32use parquet::file::metadata::ParquetMetaData;
33use parquet::format::KeyValue;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::storage::ColumnId;
37use table::predicate::Predicate;
38
39use crate::cache::index::result_cache::PredicateKey;
40use crate::cache::CacheStrategy;
41use crate::error::{
42 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
43 ReadParquetSnafu, Result,
44};
45use crate::metrics::{
46 PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
47 READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
48};
49use crate::read::prune::{PruneReader, Source};
50use crate::read::{Batch, BatchReader};
51use crate::row_converter::build_primary_key_codec;
52use crate::sst::file::{FileHandle, FileId};
53use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
54use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
55use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
56use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
57use crate::sst::parquet::format::ReadFormat;
58use crate::sst::parquet::metadata::MetadataLoader;
59use crate::sst::parquet::row_group::InMemoryRowGroup;
60use crate::sst::parquet::row_selection::RowGroupSelection;
61use crate::sst::parquet::stats::RowGroupPruningStats;
62use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
63
64const INDEX_TYPE_FULLTEXT: &str = "fulltext";
65const INDEX_TYPE_INVERTED: &str = "inverted";
66const INDEX_TYPE_BLOOM: &str = "bloom filter";
67
68macro_rules! handle_index_error {
69 ($err:expr, $file_handle:expr, $index_type:expr) => {
70 if cfg!(any(test, feature = "test")) {
71 panic!(
72 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
73 $index_type,
74 $file_handle.region_id(),
75 $file_handle.file_id(),
76 $err
77 );
78 } else {
79 warn!(
80 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
81 $index_type,
82 $file_handle.region_id(),
83 $file_handle.file_id()
84 );
85 }
86 };
87}
88
89pub struct ParquetReaderBuilder {
91 file_dir: String,
93 file_handle: FileHandle,
94 object_store: ObjectStore,
95 predicate: Option<Predicate>,
97 projection: Option<Vec<ColumnId>>,
102 cache_strategy: CacheStrategy,
104 inverted_index_applier: Option<InvertedIndexApplierRef>,
106 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
107 fulltext_index_applier: Option<FulltextIndexApplierRef>,
108 expected_metadata: Option<RegionMetadataRef>,
112}
113
114impl ParquetReaderBuilder {
115 pub fn new(
117 file_dir: String,
118 file_handle: FileHandle,
119 object_store: ObjectStore,
120 ) -> ParquetReaderBuilder {
121 ParquetReaderBuilder {
122 file_dir,
123 file_handle,
124 object_store,
125 predicate: None,
126 projection: None,
127 cache_strategy: CacheStrategy::Disabled,
128 inverted_index_applier: None,
129 bloom_filter_index_applier: None,
130 fulltext_index_applier: None,
131 expected_metadata: None,
132 }
133 }
134
135 #[must_use]
137 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
138 self.predicate = predicate;
139 self
140 }
141
142 #[must_use]
146 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
147 self.projection = projection;
148 self
149 }
150
151 #[must_use]
153 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
154 self.cache_strategy = cache;
155 self
156 }
157
158 #[must_use]
160 pub(crate) fn inverted_index_applier(
161 mut self,
162 index_applier: Option<InvertedIndexApplierRef>,
163 ) -> Self {
164 self.inverted_index_applier = index_applier;
165 self
166 }
167
168 #[must_use]
170 pub(crate) fn bloom_filter_index_applier(
171 mut self,
172 index_applier: Option<BloomFilterIndexApplierRef>,
173 ) -> Self {
174 self.bloom_filter_index_applier = index_applier;
175 self
176 }
177
178 #[must_use]
180 pub(crate) fn fulltext_index_applier(
181 mut self,
182 index_applier: Option<FulltextIndexApplierRef>,
183 ) -> Self {
184 self.fulltext_index_applier = index_applier;
185 self
186 }
187
188 #[must_use]
190 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
191 self.expected_metadata = expected_metadata;
192 self
193 }
194
195 pub async fn build(&self) -> Result<ParquetReader> {
199 let mut metrics = ReaderMetrics::default();
200
201 let (context, selection) = self.build_reader_input(&mut metrics).await?;
202 ParquetReader::new(Arc::new(context), selection).await
203 }
204
205 pub(crate) async fn build_reader_input(
209 &self,
210 metrics: &mut ReaderMetrics,
211 ) -> Result<(FileRangeContext, RowGroupSelection)> {
212 let start = Instant::now();
213
214 let file_path = self.file_handle.file_path(&self.file_dir);
215 let file_size = self.file_handle.meta_ref().file_size;
216
217 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
219 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
221 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
223 let read_format = if let Some(column_ids) = &self.projection {
224 ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
225 } else {
226 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
228 ReadFormat::new(
229 region_meta.clone(),
230 expected_meta
231 .column_metadatas
232 .iter()
233 .map(|col| col.column_id),
234 )
235 };
236
237 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
239 let indices = read_format.projection_indices();
240 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
243
244 let hint = Some(read_format.arrow_schema().fields());
246 let field_levels =
247 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
248 .context(ReadDataPartSnafu)?;
249 let selection = self
250 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
251 .await;
252
253 let reader_builder = RowGroupReaderBuilder {
254 file_handle: self.file_handle.clone(),
255 file_path,
256 parquet_meta,
257 object_store: self.object_store.clone(),
258 projection: projection_mask,
259 field_levels,
260 cache_strategy: self.cache_strategy.clone(),
261 };
262
263 let filters = if let Some(predicate) = &self.predicate {
264 predicate
265 .exprs()
266 .iter()
267 .filter_map(|expr| {
268 SimpleFilterContext::new_opt(
269 ®ion_meta,
270 self.expected_metadata.as_deref(),
271 expr,
272 )
273 })
274 .collect::<Vec<_>>()
275 } else {
276 vec![]
277 };
278
279 let codec = build_primary_key_codec(read_format.metadata());
280
281 let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
282
283 metrics.build_cost += start.elapsed();
284
285 Ok((context, selection))
286 }
287
288 fn get_region_metadata(
290 file_path: &str,
291 key_value_meta: Option<&Vec<KeyValue>>,
292 ) -> Result<RegionMetadata> {
293 let key_values = key_value_meta.context(InvalidParquetSnafu {
294 file: file_path,
295 reason: "missing key value meta",
296 })?;
297 let meta_value = key_values
298 .iter()
299 .find(|kv| kv.key == PARQUET_METADATA_KEY)
300 .with_context(|| InvalidParquetSnafu {
301 file: file_path,
302 reason: format!("key {} not found", PARQUET_METADATA_KEY),
303 })?;
304 let json = meta_value
305 .value
306 .as_ref()
307 .with_context(|| InvalidParquetSnafu {
308 file: file_path,
309 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
310 })?;
311
312 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
313 }
314
315 async fn read_parquet_metadata(
317 &self,
318 file_path: &str,
319 file_size: u64,
320 ) -> Result<Arc<ParquetMetaData>> {
321 let _t = READ_STAGE_ELAPSED
322 .with_label_values(&["read_parquet_metadata"])
323 .start_timer();
324
325 let region_id = self.file_handle.region_id();
326 let file_id = self.file_handle.file_id();
327 if let Some(metadata) = self
329 .cache_strategy
330 .get_parquet_meta_data(region_id, file_id)
331 .await
332 {
333 return Ok(metadata);
334 }
335
336 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
338 let metadata = metadata_loader.load().await?;
339 let metadata = Arc::new(metadata);
340 self.cache_strategy.put_parquet_meta_data(
342 self.file_handle.region_id(),
343 self.file_handle.file_id(),
344 metadata.clone(),
345 );
346
347 Ok(metadata)
348 }
349
350 async fn row_groups_to_read(
352 &self,
353 read_format: &ReadFormat,
354 parquet_meta: &ParquetMetaData,
355 metrics: &mut ReaderFilterMetrics,
356 ) -> RowGroupSelection {
357 let num_row_groups = parquet_meta.num_row_groups();
358 let num_rows = parquet_meta.file_metadata().num_rows();
359 if num_row_groups == 0 || num_rows == 0 {
360 return RowGroupSelection::default();
361 }
362
363 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
366 if row_group_size == 0 {
367 return RowGroupSelection::default();
368 }
369
370 metrics.rg_total += num_row_groups;
371 metrics.rows_total += num_rows as usize;
372
373 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
374
375 self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
376 if output.is_empty() {
377 return output;
378 }
379
380 let fulltext_filtered = self
381 .prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
382 .await;
383 if output.is_empty() {
384 return output;
385 }
386
387 self.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics)
388 .await;
389 if output.is_empty() {
390 return output;
391 }
392
393 self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
394 .await;
395 if output.is_empty() {
396 return output;
397 }
398
399 if !fulltext_filtered {
400 self.prune_row_groups_by_fulltext_bloom(
401 row_group_size,
402 parquet_meta,
403 &mut output,
404 metrics,
405 )
406 .await;
407 }
408 output
409 }
410
411 async fn prune_row_groups_by_fulltext_index(
413 &self,
414 row_group_size: usize,
415 parquet_meta: &ParquetMetaData,
416 output: &mut RowGroupSelection,
417 metrics: &mut ReaderFilterMetrics,
418 ) -> bool {
419 let Some(index_applier) = &self.fulltext_index_applier else {
420 return false;
421 };
422 if !self.file_handle.meta_ref().fulltext_index_available() {
423 return false;
424 }
425
426 let predicate_key = index_applier.predicate_key();
427 if self.index_result_cache_get(
429 predicate_key,
430 self.file_handle.file_id(),
431 output,
432 metrics,
433 INDEX_TYPE_FULLTEXT,
434 ) {
435 return true;
436 }
437
438 let file_size_hint = self.file_handle.meta_ref().index_file_size();
440 let apply_res = index_applier
441 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
442 .await;
443 let selection = match apply_res {
444 Ok(Some(res)) => {
445 RowGroupSelection::from_row_ids(res, row_group_size, parquet_meta.num_row_groups())
446 }
447 Ok(None) => return false,
448 Err(err) => {
449 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
450 return false;
451 }
452 };
453
454 self.apply_index_result_and_update_cache(
455 predicate_key,
456 self.file_handle.file_id(),
457 selection,
458 output,
459 metrics,
460 INDEX_TYPE_FULLTEXT,
461 );
462 true
463 }
464
465 async fn prune_row_groups_by_inverted_index(
471 &self,
472 row_group_size: usize,
473 output: &mut RowGroupSelection,
474 metrics: &mut ReaderFilterMetrics,
475 ) -> bool {
476 let Some(index_applier) = &self.inverted_index_applier else {
477 return false;
478 };
479 if !self.file_handle.meta_ref().inverted_index_available() {
480 return false;
481 }
482
483 let predicate_key = index_applier.predicate_key();
484 if self.index_result_cache_get(
486 predicate_key,
487 self.file_handle.file_id(),
488 output,
489 metrics,
490 INDEX_TYPE_INVERTED,
491 ) {
492 return true;
493 }
494
495 let file_size_hint = self.file_handle.meta_ref().index_file_size();
497 let apply_res = index_applier
498 .apply(self.file_handle.file_id(), Some(file_size_hint))
499 .await;
500 let selection = match apply_res {
501 Ok(output) => {
502 RowGroupSelection::from_inverted_index_apply_output(row_group_size, output)
503 }
504 Err(err) => {
505 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
506 return false;
507 }
508 };
509
510 self.apply_index_result_and_update_cache(
511 predicate_key,
512 self.file_handle.file_id(),
513 selection,
514 output,
515 metrics,
516 INDEX_TYPE_INVERTED,
517 );
518 true
519 }
520
521 async fn prune_row_groups_by_bloom_filter(
522 &self,
523 row_group_size: usize,
524 parquet_meta: &ParquetMetaData,
525 output: &mut RowGroupSelection,
526 metrics: &mut ReaderFilterMetrics,
527 ) -> bool {
528 let Some(index_applier) = &self.bloom_filter_index_applier else {
529 return false;
530 };
531 if !self.file_handle.meta_ref().bloom_filter_index_available() {
532 return false;
533 }
534
535 let predicate_key = index_applier.predicate_key();
536 if self.index_result_cache_get(
538 predicate_key,
539 self.file_handle.file_id(),
540 output,
541 metrics,
542 INDEX_TYPE_BLOOM,
543 ) {
544 return true;
545 }
546
547 let file_size_hint = self.file_handle.meta_ref().index_file_size();
549 let rgs = parquet_meta
550 .row_groups()
551 .iter()
552 .enumerate()
553 .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
554 let apply_res = index_applier
555 .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
556 .await;
557 let selection = match apply_res {
558 Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
559 Err(err) => {
560 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
561 return false;
562 }
563 };
564
565 self.apply_index_result_and_update_cache(
566 predicate_key,
567 self.file_handle.file_id(),
568 selection,
569 output,
570 metrics,
571 INDEX_TYPE_BLOOM,
572 );
573 true
574 }
575
576 async fn prune_row_groups_by_fulltext_bloom(
577 &self,
578 row_group_size: usize,
579 parquet_meta: &ParquetMetaData,
580 output: &mut RowGroupSelection,
581 metrics: &mut ReaderFilterMetrics,
582 ) -> bool {
583 let Some(index_applier) = &self.fulltext_index_applier else {
584 return false;
585 };
586 if !self.file_handle.meta_ref().fulltext_index_available() {
587 return false;
588 }
589
590 let predicate_key = index_applier.predicate_key();
591 if self.index_result_cache_get(
593 predicate_key,
594 self.file_handle.file_id(),
595 output,
596 metrics,
597 INDEX_TYPE_FULLTEXT,
598 ) {
599 return true;
600 }
601
602 let file_size_hint = self.file_handle.meta_ref().index_file_size();
604 let rgs = parquet_meta
605 .row_groups()
606 .iter()
607 .enumerate()
608 .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
609 let apply_res = index_applier
610 .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
611 .await;
612 let selection = match apply_res {
613 Ok(Some(apply_output)) => {
614 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
615 }
616 Ok(None) => return false,
617 Err(err) => {
618 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
619 return false;
620 }
621 };
622
623 self.apply_index_result_and_update_cache(
624 predicate_key,
625 self.file_handle.file_id(),
626 selection,
627 output,
628 metrics,
629 INDEX_TYPE_FULLTEXT,
630 );
631 true
632 }
633
634 fn prune_row_groups_by_minmax(
636 &self,
637 read_format: &ReadFormat,
638 parquet_meta: &ParquetMetaData,
639 output: &mut RowGroupSelection,
640 metrics: &mut ReaderFilterMetrics,
641 ) -> bool {
642 let Some(predicate) = &self.predicate else {
643 return false;
644 };
645
646 let row_groups_before = output.row_group_count();
647
648 let region_meta = read_format.metadata();
649 let row_groups = parquet_meta.row_groups();
650 let stats =
651 RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
652 let prune_schema = self
653 .expected_metadata
654 .as_ref()
655 .map(|meta| meta.schema.arrow_schema())
656 .unwrap_or_else(|| region_meta.schema.arrow_schema());
657
658 predicate
662 .prune_with_stats(&stats, prune_schema)
663 .iter()
664 .zip(0..parquet_meta.num_row_groups())
665 .for_each(|(mask, row_group)| {
666 if !*mask {
667 output.remove_row_group(row_group);
668 }
669 });
670
671 let row_groups_after = output.row_group_count();
672 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
673
674 true
675 }
676
677 fn index_result_cache_get(
678 &self,
679 predicate_key: &PredicateKey,
680 file_id: FileId,
681 output: &mut RowGroupSelection,
682 metrics: &mut ReaderFilterMetrics,
683 index_type: &str,
684 ) -> bool {
685 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
686 let result = index_result_cache.get(predicate_key, file_id);
687 if let Some(result) = result {
688 apply_selection_and_update_metrics(output, &result, metrics, index_type);
689 return true;
690 }
691 }
692 false
693 }
694
695 fn apply_index_result_and_update_cache(
696 &self,
697 predicate_key: &PredicateKey,
698 file_id: FileId,
699 result: RowGroupSelection,
700 output: &mut RowGroupSelection,
701 metrics: &mut ReaderFilterMetrics,
702 index_type: &str,
703 ) {
704 apply_selection_and_update_metrics(output, &result, metrics, index_type);
705
706 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
707 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
708 }
709 }
710}
711
712fn apply_selection_and_update_metrics(
713 output: &mut RowGroupSelection,
714 result: &RowGroupSelection,
715 metrics: &mut ReaderFilterMetrics,
716 index_type: &str,
717) {
718 let intersection = output.intersect(result);
719
720 let row_group_count = output.row_group_count() - intersection.row_group_count();
721 let row_count = output.row_count() - intersection.row_count();
722
723 metrics.update_index_metrics(index_type, row_group_count, row_count);
724
725 *output = intersection;
726}
727
728#[derive(Debug, Default, Clone, Copy)]
730pub(crate) struct ReaderFilterMetrics {
731 pub(crate) rg_total: usize,
733 pub(crate) rg_fulltext_filtered: usize,
735 pub(crate) rg_inverted_filtered: usize,
737 pub(crate) rg_minmax_filtered: usize,
739 pub(crate) rg_bloom_filtered: usize,
741
742 pub(crate) rows_total: usize,
744 pub(crate) rows_fulltext_filtered: usize,
746 pub(crate) rows_inverted_filtered: usize,
748 pub(crate) rows_bloom_filtered: usize,
750 pub(crate) rows_precise_filtered: usize,
752}
753
754impl ReaderFilterMetrics {
755 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
757 self.rg_total += other.rg_total;
758 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
759 self.rg_inverted_filtered += other.rg_inverted_filtered;
760 self.rg_minmax_filtered += other.rg_minmax_filtered;
761 self.rg_bloom_filtered += other.rg_bloom_filtered;
762
763 self.rows_total += other.rows_total;
764 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
765 self.rows_inverted_filtered += other.rows_inverted_filtered;
766 self.rows_bloom_filtered += other.rows_bloom_filtered;
767 self.rows_precise_filtered += other.rows_precise_filtered;
768 }
769
770 pub(crate) fn observe(&self) {
772 READ_ROW_GROUPS_TOTAL
773 .with_label_values(&["before_filtering"])
774 .inc_by(self.rg_total as u64);
775 READ_ROW_GROUPS_TOTAL
776 .with_label_values(&["fulltext_index_filtered"])
777 .inc_by(self.rg_fulltext_filtered as u64);
778 READ_ROW_GROUPS_TOTAL
779 .with_label_values(&["inverted_index_filtered"])
780 .inc_by(self.rg_inverted_filtered as u64);
781 READ_ROW_GROUPS_TOTAL
782 .with_label_values(&["minmax_index_filtered"])
783 .inc_by(self.rg_minmax_filtered as u64);
784 READ_ROW_GROUPS_TOTAL
785 .with_label_values(&["bloom_filter_index_filtered"])
786 .inc_by(self.rg_bloom_filtered as u64);
787
788 PRECISE_FILTER_ROWS_TOTAL
789 .with_label_values(&["parquet"])
790 .inc_by(self.rows_precise_filtered as u64);
791 READ_ROWS_IN_ROW_GROUP_TOTAL
792 .with_label_values(&["before_filtering"])
793 .inc_by(self.rows_total as u64);
794 READ_ROWS_IN_ROW_GROUP_TOTAL
795 .with_label_values(&["fulltext_index_filtered"])
796 .inc_by(self.rows_fulltext_filtered as u64);
797 READ_ROWS_IN_ROW_GROUP_TOTAL
798 .with_label_values(&["inverted_index_filtered"])
799 .inc_by(self.rows_inverted_filtered as u64);
800 READ_ROWS_IN_ROW_GROUP_TOTAL
801 .with_label_values(&["bloom_filter_index_filtered"])
802 .inc_by(self.rows_bloom_filtered as u64);
803 }
804
805 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
806 match index_type {
807 INDEX_TYPE_FULLTEXT => {
808 self.rg_fulltext_filtered += row_group_count;
809 self.rows_fulltext_filtered += row_count;
810 }
811 INDEX_TYPE_INVERTED => {
812 self.rg_inverted_filtered += row_group_count;
813 self.rows_inverted_filtered += row_count;
814 }
815 INDEX_TYPE_BLOOM => {
816 self.rg_bloom_filtered += row_group_count;
817 self.rows_bloom_filtered += row_count;
818 }
819 _ => {}
820 }
821 }
822}
823
824#[derive(Debug, Default, Clone)]
826pub(crate) struct ReaderMetrics {
827 pub(crate) filter_metrics: ReaderFilterMetrics,
829 pub(crate) build_cost: Duration,
831 pub(crate) scan_cost: Duration,
833 pub(crate) num_record_batches: usize,
835 pub(crate) num_batches: usize,
837 pub(crate) num_rows: usize,
839}
840
841impl ReaderMetrics {
842 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
844 self.filter_metrics.merge_from(&other.filter_metrics);
845 self.build_cost += other.build_cost;
846 self.scan_cost += other.scan_cost;
847 self.num_record_batches += other.num_record_batches;
848 self.num_batches += other.num_batches;
849 self.num_rows += other.num_rows;
850 }
851
852 pub(crate) fn observe_rows(&self, read_type: &str) {
854 READ_ROWS_TOTAL
855 .with_label_values(&[read_type])
856 .inc_by(self.num_rows as u64);
857 }
858}
859
860pub(crate) struct RowGroupReaderBuilder {
862 file_handle: FileHandle,
866 file_path: String,
868 parquet_meta: Arc<ParquetMetaData>,
870 object_store: ObjectStore,
872 projection: ProjectionMask,
874 field_levels: FieldLevels,
876 cache_strategy: CacheStrategy,
878}
879
880impl RowGroupReaderBuilder {
881 pub(crate) fn file_path(&self) -> &str {
883 &self.file_path
884 }
885
886 pub(crate) fn file_handle(&self) -> &FileHandle {
888 &self.file_handle
889 }
890
891 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
892 &self.parquet_meta
893 }
894
895 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
896 &self.cache_strategy
897 }
898
899 pub(crate) async fn build(
901 &self,
902 row_group_idx: usize,
903 row_selection: Option<RowSelection>,
904 ) -> Result<ParquetRecordBatchReader> {
905 let mut row_group = InMemoryRowGroup::create(
906 self.file_handle.region_id(),
907 self.file_handle.file_id(),
908 &self.parquet_meta,
909 row_group_idx,
910 self.cache_strategy.clone(),
911 &self.file_path,
912 self.object_store.clone(),
913 );
914 row_group
916 .fetch(&self.projection, row_selection.as_ref())
917 .await
918 .context(ReadParquetSnafu {
919 path: &self.file_path,
920 })?;
921
922 ParquetRecordBatchReader::try_new_with_row_groups(
925 &self.field_levels,
926 &row_group,
927 DEFAULT_READ_BATCH_SIZE,
928 row_selection,
929 )
930 .context(ReadParquetSnafu {
931 path: &self.file_path,
932 })
933 }
934}
935
936enum ReaderState {
938 Readable(PruneReader),
940 Exhausted(ReaderMetrics),
942}
943
944impl ReaderState {
945 fn metrics(&self) -> ReaderMetrics {
947 match self {
948 ReaderState::Readable(reader) => reader.metrics(),
949 ReaderState::Exhausted(m) => m.clone(),
950 }
951 }
952}
953
954pub(crate) enum MaybeFilter {
956 Filter(SimpleFilterEvaluator),
958 Matched,
960 Pruned,
962}
963
964pub(crate) struct SimpleFilterContext {
966 filter: MaybeFilter,
968 column_id: ColumnId,
970 semantic_type: SemanticType,
972 data_type: ConcreteDataType,
974}
975
976impl SimpleFilterContext {
977 pub(crate) fn new_opt(
982 sst_meta: &RegionMetadataRef,
983 expected_meta: Option<&RegionMetadata>,
984 expr: &Expr,
985 ) -> Option<Self> {
986 let filter = SimpleFilterEvaluator::try_new(expr)?;
987 let (column_metadata, maybe_filter) = match expected_meta {
988 Some(meta) => {
989 let column = meta.column_by_name(filter.column_name())?;
991 match sst_meta.column_by_id(column.column_id) {
994 Some(sst_column) => {
995 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
996
997 (column, MaybeFilter::Filter(filter))
998 }
999 None => {
1000 if pruned_by_default(&filter, column)? {
1004 (column, MaybeFilter::Pruned)
1005 } else {
1006 (column, MaybeFilter::Matched)
1007 }
1008 }
1009 }
1010 }
1011 None => {
1012 let column = sst_meta.column_by_name(filter.column_name())?;
1013 (column, MaybeFilter::Filter(filter))
1014 }
1015 };
1016
1017 Some(Self {
1018 filter: maybe_filter,
1019 column_id: column_metadata.column_id,
1020 semantic_type: column_metadata.semantic_type,
1021 data_type: column_metadata.column_schema.data_type.clone(),
1022 })
1023 }
1024
1025 pub(crate) fn filter(&self) -> &MaybeFilter {
1027 &self.filter
1028 }
1029
1030 pub(crate) fn column_id(&self) -> ColumnId {
1032 self.column_id
1033 }
1034
1035 pub(crate) fn semantic_type(&self) -> SemanticType {
1037 self.semantic_type
1038 }
1039
1040 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1042 &self.data_type
1043 }
1044}
1045
1046fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1049 let value = column.column_schema.create_default().ok().flatten()?;
1050 let scalar_value = value
1051 .try_to_scalar_value(&column.column_schema.data_type)
1052 .ok()?;
1053 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1054 Some(!matches)
1055}
1056
1057pub struct ParquetReader {
1059 context: FileRangeContextRef,
1061 selection: RowGroupSelection,
1063 reader_state: ReaderState,
1065}
1066
1067#[async_trait]
1068impl BatchReader for ParquetReader {
1069 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1070 let ReaderState::Readable(reader) = &mut self.reader_state else {
1071 return Ok(None);
1072 };
1073
1074 if let Some(batch) = reader.next_batch().await? {
1076 return Ok(Some(batch));
1077 }
1078
1079 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1081 let parquet_reader = self
1082 .context
1083 .reader_builder()
1084 .build(row_group_idx, Some(row_selection))
1085 .await?;
1086
1087 reader.reset_source(Source::RowGroup(RowGroupReader::new(
1089 self.context.clone(),
1090 parquet_reader,
1091 )));
1092 if let Some(batch) = reader.next_batch().await? {
1093 return Ok(Some(batch));
1094 }
1095 }
1096
1097 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1099 Ok(None)
1100 }
1101}
1102
1103impl Drop for ParquetReader {
1104 fn drop(&mut self) {
1105 let metrics = self.reader_state.metrics();
1106 debug!(
1107 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1108 self.context.reader_builder().file_handle.region_id(),
1109 self.context.reader_builder().file_handle.file_id(),
1110 self.context.reader_builder().file_handle.time_range(),
1111 metrics.filter_metrics.rg_total
1112 - metrics.filter_metrics.rg_inverted_filtered
1113 - metrics.filter_metrics.rg_minmax_filtered
1114 - metrics.filter_metrics.rg_fulltext_filtered
1115 - metrics.filter_metrics.rg_bloom_filtered,
1116 metrics.filter_metrics.rg_total,
1117 metrics
1118 );
1119
1120 READ_STAGE_ELAPSED
1122 .with_label_values(&["build_parquet_reader"])
1123 .observe(metrics.build_cost.as_secs_f64());
1124 READ_STAGE_ELAPSED
1125 .with_label_values(&["scan_row_groups"])
1126 .observe(metrics.scan_cost.as_secs_f64());
1127 metrics.observe_rows("parquet_reader");
1128 metrics.filter_metrics.observe();
1129 }
1130}
1131
1132impl ParquetReader {
1133 async fn new(context: FileRangeContextRef, mut selection: RowGroupSelection) -> Result<Self> {
1135 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1137 let parquet_reader = context
1138 .reader_builder()
1139 .build(row_group_idx, Some(row_selection))
1140 .await?;
1141 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1142 context.clone(),
1143 RowGroupReader::new(context.clone(), parquet_reader),
1144 ))
1145 } else {
1146 ReaderState::Exhausted(ReaderMetrics::default())
1147 };
1148
1149 Ok(ParquetReader {
1150 context,
1151 selection,
1152 reader_state,
1153 })
1154 }
1155
1156 pub fn metadata(&self) -> &RegionMetadataRef {
1158 self.context.read_format().metadata()
1159 }
1160
1161 #[cfg(test)]
1162 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1163 self.context.reader_builder().parquet_meta.clone()
1164 }
1165}
1166
1167pub(crate) trait RowGroupReaderContext: Send {
1170 fn map_result(
1171 &self,
1172 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1173 ) -> Result<Option<RecordBatch>>;
1174
1175 fn read_format(&self) -> &ReadFormat;
1176}
1177
1178impl RowGroupReaderContext for FileRangeContextRef {
1179 fn map_result(
1180 &self,
1181 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1182 ) -> Result<Option<RecordBatch>> {
1183 result.context(ArrowReaderSnafu {
1184 path: self.file_path(),
1185 })
1186 }
1187
1188 fn read_format(&self) -> &ReadFormat {
1189 self.as_ref().read_format()
1190 }
1191}
1192
1193pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1195
1196impl RowGroupReader {
1197 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1199 Self {
1200 context,
1201 reader,
1202 batches: VecDeque::new(),
1203 metrics: ReaderMetrics::default(),
1204 }
1205 }
1206}
1207
1208pub(crate) struct RowGroupReaderBase<T> {
1210 context: T,
1212 reader: ParquetRecordBatchReader,
1214 batches: VecDeque<Batch>,
1216 metrics: ReaderMetrics,
1218}
1219
1220impl<T> RowGroupReaderBase<T>
1221where
1222 T: RowGroupReaderContext,
1223{
1224 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1226 Self {
1227 context,
1228 reader,
1229 batches: VecDeque::new(),
1230 metrics: ReaderMetrics::default(),
1231 }
1232 }
1233
1234 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1236 &self.metrics
1237 }
1238
1239 pub(crate) fn read_format(&self) -> &ReadFormat {
1241 self.context.read_format()
1242 }
1243
1244 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1246 self.context.map_result(self.reader.next().transpose())
1247 }
1248
1249 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1251 let scan_start = Instant::now();
1252 if let Some(batch) = self.batches.pop_front() {
1253 self.metrics.num_rows += batch.num_rows();
1254 self.metrics.scan_cost += scan_start.elapsed();
1255 return Ok(Some(batch));
1256 }
1257
1258 while self.batches.is_empty() {
1260 let Some(record_batch) = self.fetch_next_record_batch()? else {
1261 self.metrics.scan_cost += scan_start.elapsed();
1262 return Ok(None);
1263 };
1264 self.metrics.num_record_batches += 1;
1265
1266 self.context
1267 .read_format()
1268 .convert_record_batch(&record_batch, &mut self.batches)?;
1269 self.metrics.num_batches += self.batches.len();
1270 }
1271 let batch = self.batches.pop_front();
1272 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1273 self.metrics.scan_cost += scan_start.elapsed();
1274 Ok(batch)
1275 }
1276}
1277
1278#[async_trait::async_trait]
1279impl<T> BatchReader for RowGroupReaderBase<T>
1280where
1281 T: RowGroupReaderContext,
1282{
1283 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1284 self.next_inner()
1285 }
1286}