1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::ColumnId;
40use table::predicate::Predicate;
41
42use crate::cache::index::result_cache::PredicateKey;
43use crate::cache::CacheStrategy;
44use crate::error::{
45 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46 ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49 PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
50 READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::{FileHandle, FileId};
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
59use crate::sst::parquet::format::{need_override_sequence, ReadFormat};
60use crate::sst::parquet::metadata::MetadataLoader;
61use crate::sst::parquet::row_group::InMemoryRowGroup;
62use crate::sst::parquet::row_selection::RowGroupSelection;
63use crate::sst::parquet::stats::RowGroupPruningStats;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
65
66const INDEX_TYPE_FULLTEXT: &str = "fulltext";
67const INDEX_TYPE_INVERTED: &str = "inverted";
68const INDEX_TYPE_BLOOM: &str = "bloom filter";
69
70macro_rules! handle_index_error {
71 ($err:expr, $file_handle:expr, $index_type:expr) => {
72 if cfg!(any(test, feature = "test")) {
73 panic!(
74 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
75 $index_type,
76 $file_handle.region_id(),
77 $file_handle.file_id(),
78 $err
79 );
80 } else {
81 warn!(
82 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
83 $index_type,
84 $file_handle.region_id(),
85 $file_handle.file_id()
86 );
87 }
88 };
89}
90
91pub struct ParquetReaderBuilder {
93 file_dir: String,
95 path_type: PathType,
97 file_handle: FileHandle,
98 object_store: ObjectStore,
99 predicate: Option<Predicate>,
101 projection: Option<Vec<ColumnId>>,
106 cache_strategy: CacheStrategy,
108 inverted_index_applier: Option<InvertedIndexApplierRef>,
110 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
111 fulltext_index_applier: Option<FulltextIndexApplierRef>,
112 expected_metadata: Option<RegionMetadataRef>,
116}
117
118impl ParquetReaderBuilder {
119 pub fn new(
121 file_dir: String,
122 path_type: PathType,
123 file_handle: FileHandle,
124 object_store: ObjectStore,
125 ) -> ParquetReaderBuilder {
126 ParquetReaderBuilder {
127 file_dir,
128 path_type,
129 file_handle,
130 object_store,
131 predicate: None,
132 projection: None,
133 cache_strategy: CacheStrategy::Disabled,
134 inverted_index_applier: None,
135 bloom_filter_index_applier: None,
136 fulltext_index_applier: None,
137 expected_metadata: None,
138 }
139 }
140
141 #[must_use]
143 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
144 self.predicate = predicate;
145 self
146 }
147
148 #[must_use]
152 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
153 self.projection = projection;
154 self
155 }
156
157 #[must_use]
159 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
160 self.cache_strategy = cache;
161 self
162 }
163
164 #[must_use]
166 pub(crate) fn inverted_index_applier(
167 mut self,
168 index_applier: Option<InvertedIndexApplierRef>,
169 ) -> Self {
170 self.inverted_index_applier = index_applier;
171 self
172 }
173
174 #[must_use]
176 pub(crate) fn bloom_filter_index_applier(
177 mut self,
178 index_applier: Option<BloomFilterIndexApplierRef>,
179 ) -> Self {
180 self.bloom_filter_index_applier = index_applier;
181 self
182 }
183
184 #[must_use]
186 pub(crate) fn fulltext_index_applier(
187 mut self,
188 index_applier: Option<FulltextIndexApplierRef>,
189 ) -> Self {
190 self.fulltext_index_applier = index_applier;
191 self
192 }
193
194 #[must_use]
196 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
197 self.expected_metadata = expected_metadata;
198 self
199 }
200
201 pub async fn build(&self) -> Result<ParquetReader> {
205 let mut metrics = ReaderMetrics::default();
206
207 let (context, selection) = self.build_reader_input(&mut metrics).await?;
208 ParquetReader::new(Arc::new(context), selection).await
209 }
210
211 pub(crate) async fn build_reader_input(
215 &self,
216 metrics: &mut ReaderMetrics,
217 ) -> Result<(FileRangeContext, RowGroupSelection)> {
218 let start = Instant::now();
219
220 let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
221 let file_size = self.file_handle.meta_ref().file_size;
222
223 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
225 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
227 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
229 let mut read_format = if let Some(column_ids) = &self.projection {
230 ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
231 } else {
232 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
234 ReadFormat::new(
235 region_meta.clone(),
236 expected_meta
237 .column_metadatas
238 .iter()
239 .map(|col| col.column_id),
240 )
241 };
242 if need_override_sequence(&parquet_meta) {
243 read_format
244 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
245 }
246
247 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
249 let indices = read_format.projection_indices();
250 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
253
254 let hint = Some(read_format.arrow_schema().fields());
256 let field_levels =
257 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
258 .context(ReadDataPartSnafu)?;
259 let selection = self
260 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
261 .await;
262
263 let reader_builder = RowGroupReaderBuilder {
264 file_handle: self.file_handle.clone(),
265 file_path,
266 parquet_meta,
267 object_store: self.object_store.clone(),
268 projection: projection_mask,
269 field_levels,
270 cache_strategy: self.cache_strategy.clone(),
271 };
272
273 let filters = if let Some(predicate) = &self.predicate {
274 predicate
275 .exprs()
276 .iter()
277 .filter_map(|expr| {
278 SimpleFilterContext::new_opt(
279 ®ion_meta,
280 self.expected_metadata.as_deref(),
281 expr,
282 )
283 })
284 .collect::<Vec<_>>()
285 } else {
286 vec![]
287 };
288
289 let codec = build_primary_key_codec(read_format.metadata());
290
291 let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
292
293 metrics.build_cost += start.elapsed();
294
295 Ok((context, selection))
296 }
297
298 fn get_region_metadata(
300 file_path: &str,
301 key_value_meta: Option<&Vec<KeyValue>>,
302 ) -> Result<RegionMetadata> {
303 let key_values = key_value_meta.context(InvalidParquetSnafu {
304 file: file_path,
305 reason: "missing key value meta",
306 })?;
307 let meta_value = key_values
308 .iter()
309 .find(|kv| kv.key == PARQUET_METADATA_KEY)
310 .with_context(|| InvalidParquetSnafu {
311 file: file_path,
312 reason: format!("key {} not found", PARQUET_METADATA_KEY),
313 })?;
314 let json = meta_value
315 .value
316 .as_ref()
317 .with_context(|| InvalidParquetSnafu {
318 file: file_path,
319 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
320 })?;
321
322 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
323 }
324
325 async fn read_parquet_metadata(
327 &self,
328 file_path: &str,
329 file_size: u64,
330 ) -> Result<Arc<ParquetMetaData>> {
331 let _t = READ_STAGE_ELAPSED
332 .with_label_values(&["read_parquet_metadata"])
333 .start_timer();
334
335 let file_id = self.file_handle.file_id();
336 if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
338 return Ok(metadata);
339 }
340
341 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
343 let metadata = metadata_loader.load().await?;
344 let metadata = Arc::new(metadata);
345 self.cache_strategy
347 .put_parquet_meta_data(file_id, metadata.clone());
348
349 Ok(metadata)
350 }
351
352 async fn row_groups_to_read(
354 &self,
355 read_format: &ReadFormat,
356 parquet_meta: &ParquetMetaData,
357 metrics: &mut ReaderFilterMetrics,
358 ) -> RowGroupSelection {
359 let num_row_groups = parquet_meta.num_row_groups();
360 let num_rows = parquet_meta.file_metadata().num_rows();
361 if num_row_groups == 0 || num_rows == 0 {
362 return RowGroupSelection::default();
363 }
364
365 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
368 if row_group_size == 0 {
369 return RowGroupSelection::default();
370 }
371
372 metrics.rg_total += num_row_groups;
373 metrics.rows_total += num_rows as usize;
374
375 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
376
377 self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
378 if output.is_empty() {
379 return output;
380 }
381
382 let fulltext_filtered = self
383 .prune_row_groups_by_fulltext_index(
384 row_group_size,
385 num_row_groups,
386 &mut output,
387 metrics,
388 )
389 .await;
390 if output.is_empty() {
391 return output;
392 }
393
394 self.prune_row_groups_by_inverted_index(
395 row_group_size,
396 num_row_groups,
397 &mut output,
398 metrics,
399 )
400 .await;
401 if output.is_empty() {
402 return output;
403 }
404
405 self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
406 .await;
407 if output.is_empty() {
408 return output;
409 }
410
411 if !fulltext_filtered {
412 self.prune_row_groups_by_fulltext_bloom(
413 row_group_size,
414 parquet_meta,
415 &mut output,
416 metrics,
417 )
418 .await;
419 }
420 output
421 }
422
423 async fn prune_row_groups_by_fulltext_index(
425 &self,
426 row_group_size: usize,
427 num_row_groups: usize,
428 output: &mut RowGroupSelection,
429 metrics: &mut ReaderFilterMetrics,
430 ) -> bool {
431 let Some(index_applier) = &self.fulltext_index_applier else {
432 return false;
433 };
434 if !self.file_handle.meta_ref().fulltext_index_available() {
435 return false;
436 }
437
438 let predicate_key = index_applier.predicate_key();
439 let cached = self
441 .cache_strategy
442 .index_result_cache()
443 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
444 if let Some(result) = cached.as_ref() {
445 if all_required_row_groups_searched(output, result) {
446 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
447 return true;
448 }
449 }
450
451 let file_size_hint = self.file_handle.meta_ref().index_file_size();
453 let apply_res = index_applier
454 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
455 .await;
456 let selection = match apply_res {
457 Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
458 Ok(None) => return false,
459 Err(err) => {
460 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
461 return false;
462 }
463 };
464
465 self.apply_index_result_and_update_cache(
466 predicate_key,
467 self.file_handle.file_id().file_id(),
468 selection,
469 output,
470 metrics,
471 INDEX_TYPE_FULLTEXT,
472 );
473 true
474 }
475
476 async fn prune_row_groups_by_inverted_index(
482 &self,
483 row_group_size: usize,
484 num_row_groups: usize,
485 output: &mut RowGroupSelection,
486 metrics: &mut ReaderFilterMetrics,
487 ) -> bool {
488 let Some(index_applier) = &self.inverted_index_applier else {
489 return false;
490 };
491 if !self.file_handle.meta_ref().inverted_index_available() {
492 return false;
493 }
494
495 let predicate_key = index_applier.predicate_key();
496 let cached = self
498 .cache_strategy
499 .index_result_cache()
500 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
501 if let Some(result) = cached.as_ref() {
502 if all_required_row_groups_searched(output, result) {
503 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
504 return true;
505 }
506 }
507
508 let file_size_hint = self.file_handle.meta_ref().index_file_size();
510 let apply_res = index_applier
511 .apply(self.file_handle.file_id(), Some(file_size_hint))
512 .await;
513 let selection = match apply_res {
514 Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
515 row_group_size,
516 num_row_groups,
517 output,
518 ),
519 Err(err) => {
520 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
521 return false;
522 }
523 };
524
525 self.apply_index_result_and_update_cache(
526 predicate_key,
527 self.file_handle.file_id().file_id(),
528 selection,
529 output,
530 metrics,
531 INDEX_TYPE_INVERTED,
532 );
533 true
534 }
535
536 async fn prune_row_groups_by_bloom_filter(
537 &self,
538 row_group_size: usize,
539 parquet_meta: &ParquetMetaData,
540 output: &mut RowGroupSelection,
541 metrics: &mut ReaderFilterMetrics,
542 ) -> bool {
543 let Some(index_applier) = &self.bloom_filter_index_applier else {
544 return false;
545 };
546 if !self.file_handle.meta_ref().bloom_filter_index_available() {
547 return false;
548 }
549
550 let predicate_key = index_applier.predicate_key();
551 let cached = self
553 .cache_strategy
554 .index_result_cache()
555 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
556 if let Some(result) = cached.as_ref() {
557 if all_required_row_groups_searched(output, result) {
558 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
559 return true;
560 }
561 }
562
563 let file_size_hint = self.file_handle.meta_ref().index_file_size();
565 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
566 (
567 rg.num_rows() as usize,
568 output.contains_non_empty_row_group(i)
570 && cached
571 .as_ref()
572 .map(|c| !c.contains_row_group(i))
573 .unwrap_or(true),
574 )
575 });
576 let apply_res = index_applier
577 .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
578 .await;
579 let mut selection = match apply_res {
580 Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
581 Err(err) => {
582 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
583 return false;
584 }
585 };
586
587 if let Some(cached) = cached.as_ref() {
589 selection.concat(cached);
590 }
591
592 self.apply_index_result_and_update_cache(
593 predicate_key,
594 self.file_handle.file_id().file_id(),
595 selection,
596 output,
597 metrics,
598 INDEX_TYPE_BLOOM,
599 );
600 true
601 }
602
603 async fn prune_row_groups_by_fulltext_bloom(
604 &self,
605 row_group_size: usize,
606 parquet_meta: &ParquetMetaData,
607 output: &mut RowGroupSelection,
608 metrics: &mut ReaderFilterMetrics,
609 ) -> bool {
610 let Some(index_applier) = &self.fulltext_index_applier else {
611 return false;
612 };
613 if !self.file_handle.meta_ref().fulltext_index_available() {
614 return false;
615 }
616
617 let predicate_key = index_applier.predicate_key();
618 let cached = self
620 .cache_strategy
621 .index_result_cache()
622 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
623 if let Some(result) = cached.as_ref() {
624 if all_required_row_groups_searched(output, result) {
625 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
626 return true;
627 }
628 }
629
630 let file_size_hint = self.file_handle.meta_ref().index_file_size();
632 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
633 (
634 rg.num_rows() as usize,
635 output.contains_non_empty_row_group(i)
637 && cached
638 .as_ref()
639 .map(|c| !c.contains_row_group(i))
640 .unwrap_or(true),
641 )
642 });
643 let apply_res = index_applier
644 .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
645 .await;
646 let mut selection = match apply_res {
647 Ok(Some(apply_output)) => {
648 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
649 }
650 Ok(None) => return false,
651 Err(err) => {
652 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
653 return false;
654 }
655 };
656
657 if let Some(cached) = cached.as_ref() {
659 selection.concat(cached);
660 }
661
662 self.apply_index_result_and_update_cache(
663 predicate_key,
664 self.file_handle.file_id().file_id(),
665 selection,
666 output,
667 metrics,
668 INDEX_TYPE_FULLTEXT,
669 );
670 true
671 }
672
673 fn prune_row_groups_by_minmax(
675 &self,
676 read_format: &ReadFormat,
677 parquet_meta: &ParquetMetaData,
678 output: &mut RowGroupSelection,
679 metrics: &mut ReaderFilterMetrics,
680 ) -> bool {
681 let Some(predicate) = &self.predicate else {
682 return false;
683 };
684
685 let row_groups_before = output.row_group_count();
686
687 let region_meta = read_format.metadata();
688 let row_groups = parquet_meta.row_groups();
689 let stats =
690 RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
691 let prune_schema = self
692 .expected_metadata
693 .as_ref()
694 .map(|meta| meta.schema.arrow_schema())
695 .unwrap_or_else(|| region_meta.schema.arrow_schema());
696
697 predicate
701 .prune_with_stats(&stats, prune_schema)
702 .iter()
703 .zip(0..parquet_meta.num_row_groups())
704 .for_each(|(mask, row_group)| {
705 if !*mask {
706 output.remove_row_group(row_group);
707 }
708 });
709
710 let row_groups_after = output.row_group_count();
711 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
712
713 true
714 }
715
716 fn apply_index_result_and_update_cache(
717 &self,
718 predicate_key: &PredicateKey,
719 file_id: FileId,
720 result: RowGroupSelection,
721 output: &mut RowGroupSelection,
722 metrics: &mut ReaderFilterMetrics,
723 index_type: &str,
724 ) {
725 apply_selection_and_update_metrics(output, &result, metrics, index_type);
726
727 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
728 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
729 }
730 }
731}
732
733fn apply_selection_and_update_metrics(
734 output: &mut RowGroupSelection,
735 result: &RowGroupSelection,
736 metrics: &mut ReaderFilterMetrics,
737 index_type: &str,
738) {
739 let intersection = output.intersect(result);
740
741 let row_group_count = output.row_group_count() - intersection.row_group_count();
742 let row_count = output.row_count() - intersection.row_count();
743
744 metrics.update_index_metrics(index_type, row_group_count, row_count);
745
746 *output = intersection;
747}
748
749fn all_required_row_groups_searched(
750 required_row_groups: &RowGroupSelection,
751 cached_row_groups: &RowGroupSelection,
752) -> bool {
753 required_row_groups.iter().all(|(rg_id, _)| {
754 !required_row_groups.contains_non_empty_row_group(*rg_id)
756 || cached_row_groups.contains_row_group(*rg_id)
758 })
759}
760
761#[derive(Debug, Default, Clone, Copy)]
763pub(crate) struct ReaderFilterMetrics {
764 pub(crate) rg_total: usize,
766 pub(crate) rg_fulltext_filtered: usize,
768 pub(crate) rg_inverted_filtered: usize,
770 pub(crate) rg_minmax_filtered: usize,
772 pub(crate) rg_bloom_filtered: usize,
774
775 pub(crate) rows_total: usize,
777 pub(crate) rows_fulltext_filtered: usize,
779 pub(crate) rows_inverted_filtered: usize,
781 pub(crate) rows_bloom_filtered: usize,
783 pub(crate) rows_precise_filtered: usize,
785}
786
787impl ReaderFilterMetrics {
788 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
790 self.rg_total += other.rg_total;
791 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
792 self.rg_inverted_filtered += other.rg_inverted_filtered;
793 self.rg_minmax_filtered += other.rg_minmax_filtered;
794 self.rg_bloom_filtered += other.rg_bloom_filtered;
795
796 self.rows_total += other.rows_total;
797 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
798 self.rows_inverted_filtered += other.rows_inverted_filtered;
799 self.rows_bloom_filtered += other.rows_bloom_filtered;
800 self.rows_precise_filtered += other.rows_precise_filtered;
801 }
802
803 pub(crate) fn observe(&self) {
805 READ_ROW_GROUPS_TOTAL
806 .with_label_values(&["before_filtering"])
807 .inc_by(self.rg_total as u64);
808 READ_ROW_GROUPS_TOTAL
809 .with_label_values(&["fulltext_index_filtered"])
810 .inc_by(self.rg_fulltext_filtered as u64);
811 READ_ROW_GROUPS_TOTAL
812 .with_label_values(&["inverted_index_filtered"])
813 .inc_by(self.rg_inverted_filtered as u64);
814 READ_ROW_GROUPS_TOTAL
815 .with_label_values(&["minmax_index_filtered"])
816 .inc_by(self.rg_minmax_filtered as u64);
817 READ_ROW_GROUPS_TOTAL
818 .with_label_values(&["bloom_filter_index_filtered"])
819 .inc_by(self.rg_bloom_filtered as u64);
820
821 PRECISE_FILTER_ROWS_TOTAL
822 .with_label_values(&["parquet"])
823 .inc_by(self.rows_precise_filtered as u64);
824 READ_ROWS_IN_ROW_GROUP_TOTAL
825 .with_label_values(&["before_filtering"])
826 .inc_by(self.rows_total as u64);
827 READ_ROWS_IN_ROW_GROUP_TOTAL
828 .with_label_values(&["fulltext_index_filtered"])
829 .inc_by(self.rows_fulltext_filtered as u64);
830 READ_ROWS_IN_ROW_GROUP_TOTAL
831 .with_label_values(&["inverted_index_filtered"])
832 .inc_by(self.rows_inverted_filtered as u64);
833 READ_ROWS_IN_ROW_GROUP_TOTAL
834 .with_label_values(&["bloom_filter_index_filtered"])
835 .inc_by(self.rows_bloom_filtered as u64);
836 }
837
838 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
839 match index_type {
840 INDEX_TYPE_FULLTEXT => {
841 self.rg_fulltext_filtered += row_group_count;
842 self.rows_fulltext_filtered += row_count;
843 }
844 INDEX_TYPE_INVERTED => {
845 self.rg_inverted_filtered += row_group_count;
846 self.rows_inverted_filtered += row_count;
847 }
848 INDEX_TYPE_BLOOM => {
849 self.rg_bloom_filtered += row_group_count;
850 self.rows_bloom_filtered += row_count;
851 }
852 _ => {}
853 }
854 }
855}
856
857#[derive(Debug, Default, Clone)]
859pub struct ReaderMetrics {
860 pub(crate) filter_metrics: ReaderFilterMetrics,
862 pub(crate) build_cost: Duration,
864 pub(crate) scan_cost: Duration,
866 pub(crate) num_record_batches: usize,
868 pub(crate) num_batches: usize,
870 pub(crate) num_rows: usize,
872}
873
874impl ReaderMetrics {
875 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
877 self.filter_metrics.merge_from(&other.filter_metrics);
878 self.build_cost += other.build_cost;
879 self.scan_cost += other.scan_cost;
880 self.num_record_batches += other.num_record_batches;
881 self.num_batches += other.num_batches;
882 self.num_rows += other.num_rows;
883 }
884
885 pub(crate) fn observe_rows(&self, read_type: &str) {
887 READ_ROWS_TOTAL
888 .with_label_values(&[read_type])
889 .inc_by(self.num_rows as u64);
890 }
891}
892
893pub(crate) struct RowGroupReaderBuilder {
895 file_handle: FileHandle,
899 file_path: String,
901 parquet_meta: Arc<ParquetMetaData>,
903 object_store: ObjectStore,
905 projection: ProjectionMask,
907 field_levels: FieldLevels,
909 cache_strategy: CacheStrategy,
911}
912
913impl RowGroupReaderBuilder {
914 pub(crate) fn file_path(&self) -> &str {
916 &self.file_path
917 }
918
919 pub(crate) fn file_handle(&self) -> &FileHandle {
921 &self.file_handle
922 }
923
924 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
925 &self.parquet_meta
926 }
927
928 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
929 &self.cache_strategy
930 }
931
932 pub(crate) async fn build(
934 &self,
935 row_group_idx: usize,
936 row_selection: Option<RowSelection>,
937 ) -> Result<ParquetRecordBatchReader> {
938 let mut row_group = InMemoryRowGroup::create(
939 self.file_handle.region_id(),
940 self.file_handle.file_id().file_id(),
941 &self.parquet_meta,
942 row_group_idx,
943 self.cache_strategy.clone(),
944 &self.file_path,
945 self.object_store.clone(),
946 );
947 row_group
949 .fetch(&self.projection, row_selection.as_ref())
950 .await
951 .context(ReadParquetSnafu {
952 path: &self.file_path,
953 })?;
954
955 ParquetRecordBatchReader::try_new_with_row_groups(
958 &self.field_levels,
959 &row_group,
960 DEFAULT_READ_BATCH_SIZE,
961 row_selection,
962 )
963 .context(ReadParquetSnafu {
964 path: &self.file_path,
965 })
966 }
967}
968
969enum ReaderState {
971 Readable(PruneReader),
973 Exhausted(ReaderMetrics),
975}
976
977impl ReaderState {
978 fn metrics(&self) -> ReaderMetrics {
980 match self {
981 ReaderState::Readable(reader) => reader.metrics(),
982 ReaderState::Exhausted(m) => m.clone(),
983 }
984 }
985}
986
987pub(crate) enum MaybeFilter {
989 Filter(SimpleFilterEvaluator),
991 Matched,
993 Pruned,
995}
996
997pub(crate) struct SimpleFilterContext {
999 filter: MaybeFilter,
1001 column_id: ColumnId,
1003 semantic_type: SemanticType,
1005 data_type: ConcreteDataType,
1007}
1008
1009impl SimpleFilterContext {
1010 pub(crate) fn new_opt(
1015 sst_meta: &RegionMetadataRef,
1016 expected_meta: Option<&RegionMetadata>,
1017 expr: &Expr,
1018 ) -> Option<Self> {
1019 let filter = SimpleFilterEvaluator::try_new(expr)?;
1020 let (column_metadata, maybe_filter) = match expected_meta {
1021 Some(meta) => {
1022 let column = meta.column_by_name(filter.column_name())?;
1024 match sst_meta.column_by_id(column.column_id) {
1027 Some(sst_column) => {
1028 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1029
1030 (column, MaybeFilter::Filter(filter))
1031 }
1032 None => {
1033 if pruned_by_default(&filter, column)? {
1037 (column, MaybeFilter::Pruned)
1038 } else {
1039 (column, MaybeFilter::Matched)
1040 }
1041 }
1042 }
1043 }
1044 None => {
1045 let column = sst_meta.column_by_name(filter.column_name())?;
1046 (column, MaybeFilter::Filter(filter))
1047 }
1048 };
1049
1050 Some(Self {
1051 filter: maybe_filter,
1052 column_id: column_metadata.column_id,
1053 semantic_type: column_metadata.semantic_type,
1054 data_type: column_metadata.column_schema.data_type.clone(),
1055 })
1056 }
1057
1058 pub(crate) fn filter(&self) -> &MaybeFilter {
1060 &self.filter
1061 }
1062
1063 pub(crate) fn column_id(&self) -> ColumnId {
1065 self.column_id
1066 }
1067
1068 pub(crate) fn semantic_type(&self) -> SemanticType {
1070 self.semantic_type
1071 }
1072
1073 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1075 &self.data_type
1076 }
1077}
1078
1079fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1082 let value = column.column_schema.create_default().ok().flatten()?;
1083 let scalar_value = value
1084 .try_to_scalar_value(&column.column_schema.data_type)
1085 .ok()?;
1086 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1087 Some(!matches)
1088}
1089
1090pub struct ParquetReader {
1092 context: FileRangeContextRef,
1094 selection: RowGroupSelection,
1096 reader_state: ReaderState,
1098}
1099
1100#[async_trait]
1101impl BatchReader for ParquetReader {
1102 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1103 let ReaderState::Readable(reader) = &mut self.reader_state else {
1104 return Ok(None);
1105 };
1106
1107 if let Some(batch) = reader.next_batch().await? {
1109 return Ok(Some(batch));
1110 }
1111
1112 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1114 let parquet_reader = self
1115 .context
1116 .reader_builder()
1117 .build(row_group_idx, Some(row_selection))
1118 .await?;
1119
1120 reader.reset_source(Source::RowGroup(RowGroupReader::new(
1122 self.context.clone(),
1123 parquet_reader,
1124 )));
1125 if let Some(batch) = reader.next_batch().await? {
1126 return Ok(Some(batch));
1127 }
1128 }
1129
1130 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1132 Ok(None)
1133 }
1134}
1135
1136impl Drop for ParquetReader {
1137 fn drop(&mut self) {
1138 let metrics = self.reader_state.metrics();
1139 debug!(
1140 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1141 self.context.reader_builder().file_handle.region_id(),
1142 self.context.reader_builder().file_handle.file_id(),
1143 self.context.reader_builder().file_handle.time_range(),
1144 metrics.filter_metrics.rg_total
1145 - metrics.filter_metrics.rg_inverted_filtered
1146 - metrics.filter_metrics.rg_minmax_filtered
1147 - metrics.filter_metrics.rg_fulltext_filtered
1148 - metrics.filter_metrics.rg_bloom_filtered,
1149 metrics.filter_metrics.rg_total,
1150 metrics
1151 );
1152
1153 READ_STAGE_ELAPSED
1155 .with_label_values(&["build_parquet_reader"])
1156 .observe(metrics.build_cost.as_secs_f64());
1157 READ_STAGE_ELAPSED
1158 .with_label_values(&["scan_row_groups"])
1159 .observe(metrics.scan_cost.as_secs_f64());
1160 metrics.observe_rows("parquet_reader");
1161 metrics.filter_metrics.observe();
1162 }
1163}
1164
1165impl ParquetReader {
1166 pub(crate) async fn new(
1168 context: FileRangeContextRef,
1169 mut selection: RowGroupSelection,
1170 ) -> Result<Self> {
1171 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1173 let parquet_reader = context
1174 .reader_builder()
1175 .build(row_group_idx, Some(row_selection))
1176 .await?;
1177 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1178 context.clone(),
1179 RowGroupReader::new(context.clone(), parquet_reader),
1180 ))
1181 } else {
1182 ReaderState::Exhausted(ReaderMetrics::default())
1183 };
1184
1185 Ok(ParquetReader {
1186 context,
1187 selection,
1188 reader_state,
1189 })
1190 }
1191
1192 pub fn metadata(&self) -> &RegionMetadataRef {
1194 self.context.read_format().metadata()
1195 }
1196
1197 #[cfg(test)]
1198 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1199 self.context.reader_builder().parquet_meta.clone()
1200 }
1201}
1202
1203pub(crate) trait RowGroupReaderContext: Send {
1206 fn map_result(
1207 &self,
1208 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1209 ) -> Result<Option<RecordBatch>>;
1210
1211 fn read_format(&self) -> &ReadFormat;
1212}
1213
1214impl RowGroupReaderContext for FileRangeContextRef {
1215 fn map_result(
1216 &self,
1217 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1218 ) -> Result<Option<RecordBatch>> {
1219 result.context(ArrowReaderSnafu {
1220 path: self.file_path(),
1221 })
1222 }
1223
1224 fn read_format(&self) -> &ReadFormat {
1225 self.as_ref().read_format()
1226 }
1227}
1228
1229pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1231
1232impl RowGroupReader {
1233 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1235 Self::create(context, reader)
1236 }
1237}
1238
1239pub(crate) struct RowGroupReaderBase<T> {
1241 context: T,
1243 reader: ParquetRecordBatchReader,
1245 batches: VecDeque<Batch>,
1247 metrics: ReaderMetrics,
1249 override_sequence: Option<ArrayRef>,
1251}
1252
1253impl<T> RowGroupReaderBase<T>
1254where
1255 T: RowGroupReaderContext,
1256{
1257 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1259 let override_sequence = context
1261 .read_format()
1262 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1263 Self {
1264 context,
1265 reader,
1266 batches: VecDeque::new(),
1267 metrics: ReaderMetrics::default(),
1268 override_sequence,
1269 }
1270 }
1271
1272 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1274 &self.metrics
1275 }
1276
1277 pub(crate) fn read_format(&self) -> &ReadFormat {
1279 self.context.read_format()
1280 }
1281
1282 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1284 self.context.map_result(self.reader.next().transpose())
1285 }
1286
1287 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1289 let scan_start = Instant::now();
1290 if let Some(batch) = self.batches.pop_front() {
1291 self.metrics.num_rows += batch.num_rows();
1292 self.metrics.scan_cost += scan_start.elapsed();
1293 return Ok(Some(batch));
1294 }
1295
1296 while self.batches.is_empty() {
1298 let Some(record_batch) = self.fetch_next_record_batch()? else {
1299 self.metrics.scan_cost += scan_start.elapsed();
1300 return Ok(None);
1301 };
1302 self.metrics.num_record_batches += 1;
1303
1304 self.context.read_format().convert_record_batch(
1305 &record_batch,
1306 self.override_sequence.as_ref(),
1307 &mut self.batches,
1308 )?;
1309 self.metrics.num_batches += self.batches.len();
1310 }
1311 let batch = self.batches.pop_front();
1312 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1313 self.metrics.scan_cost += scan_start.elapsed();
1314 Ok(batch)
1315 }
1316}
1317
1318#[async_trait::async_trait]
1319impl<T> BatchReader for RowGroupReaderBase<T>
1320where
1321 T: RowGroupReaderContext,
1322{
1323 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1324 self.next_inner()
1325 }
1326}