1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46 ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
59use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
60use crate::sst::parquet::metadata::MetadataLoader;
61use crate::sst::parquet::row_group::InMemoryRowGroup;
62use crate::sst::parquet::row_selection::RowGroupSelection;
63use crate::sst::parquet::stats::RowGroupPruningStats;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
65
66const INDEX_TYPE_FULLTEXT: &str = "fulltext";
67const INDEX_TYPE_INVERTED: &str = "inverted";
68const INDEX_TYPE_BLOOM: &str = "bloom filter";
69
70macro_rules! handle_index_error {
71 ($err:expr, $file_handle:expr, $index_type:expr) => {
72 if cfg!(any(test, feature = "test")) {
73 panic!(
74 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
75 $index_type,
76 $file_handle.region_id(),
77 $file_handle.file_id(),
78 $err
79 );
80 } else {
81 warn!(
82 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
83 $index_type,
84 $file_handle.region_id(),
85 $file_handle.file_id()
86 );
87 }
88 };
89}
90
91pub struct ParquetReaderBuilder {
93 file_dir: String,
95 path_type: PathType,
97 file_handle: FileHandle,
98 object_store: ObjectStore,
99 predicate: Option<Predicate>,
101 projection: Option<Vec<ColumnId>>,
106 cache_strategy: CacheStrategy,
108 inverted_index_applier: Option<InvertedIndexApplierRef>,
110 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
111 fulltext_index_applier: Option<FulltextIndexApplierRef>,
112 expected_metadata: Option<RegionMetadataRef>,
116 flat_format: bool,
118 compaction: bool,
120}
121
122impl ParquetReaderBuilder {
123 pub fn new(
125 file_dir: String,
126 path_type: PathType,
127 file_handle: FileHandle,
128 object_store: ObjectStore,
129 ) -> ParquetReaderBuilder {
130 ParquetReaderBuilder {
131 file_dir,
132 path_type,
133 file_handle,
134 object_store,
135 predicate: None,
136 projection: None,
137 cache_strategy: CacheStrategy::Disabled,
138 inverted_index_applier: None,
139 bloom_filter_index_applier: None,
140 fulltext_index_applier: None,
141 expected_metadata: None,
142 flat_format: false,
143 compaction: false,
144 }
145 }
146
147 #[must_use]
149 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
150 self.predicate = predicate;
151 self
152 }
153
154 #[must_use]
158 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
159 self.projection = projection;
160 self
161 }
162
163 #[must_use]
165 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
166 self.cache_strategy = cache;
167 self
168 }
169
170 #[must_use]
172 pub(crate) fn inverted_index_applier(
173 mut self,
174 index_applier: Option<InvertedIndexApplierRef>,
175 ) -> Self {
176 self.inverted_index_applier = index_applier;
177 self
178 }
179
180 #[must_use]
182 pub(crate) fn bloom_filter_index_applier(
183 mut self,
184 index_applier: Option<BloomFilterIndexApplierRef>,
185 ) -> Self {
186 self.bloom_filter_index_applier = index_applier;
187 self
188 }
189
190 #[must_use]
192 pub(crate) fn fulltext_index_applier(
193 mut self,
194 index_applier: Option<FulltextIndexApplierRef>,
195 ) -> Self {
196 self.fulltext_index_applier = index_applier;
197 self
198 }
199
200 #[must_use]
202 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
203 self.expected_metadata = expected_metadata;
204 self
205 }
206
207 #[must_use]
209 pub fn flat_format(mut self, flat_format: bool) -> Self {
210 self.flat_format = flat_format;
211 self
212 }
213
214 #[must_use]
216 pub fn compaction(mut self, compaction: bool) -> Self {
217 self.compaction = compaction;
218 self
219 }
220
221 pub async fn build(&self) -> Result<ParquetReader> {
225 let mut metrics = ReaderMetrics::default();
226
227 let (context, selection) = self.build_reader_input(&mut metrics).await?;
228 ParquetReader::new(Arc::new(context), selection).await
229 }
230
231 pub(crate) async fn build_reader_input(
235 &self,
236 metrics: &mut ReaderMetrics,
237 ) -> Result<(FileRangeContext, RowGroupSelection)> {
238 let start = Instant::now();
239
240 let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
241 let file_size = self.file_handle.meta_ref().file_size;
242
243 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
245 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
247 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
249 let mut read_format = if let Some(column_ids) = &self.projection {
250 ReadFormat::new(
251 region_meta.clone(),
252 Some(column_ids),
253 self.flat_format,
254 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
255 &file_path,
256 self.compaction,
257 )?
258 } else {
259 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
261 let column_ids: Vec<_> = expected_meta
262 .column_metadatas
263 .iter()
264 .map(|col| col.column_id)
265 .collect();
266 ReadFormat::new(
267 region_meta.clone(),
268 Some(&column_ids),
269 self.flat_format,
270 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
271 &file_path,
272 self.compaction,
273 )?
274 };
275 if need_override_sequence(&parquet_meta) {
276 read_format
277 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
278 }
279
280 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
282 let indices = read_format.projection_indices();
283 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
286
287 let hint = Some(read_format.arrow_schema().fields());
289 let field_levels =
290 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
291 .context(ReadDataPartSnafu)?;
292 let selection = self
293 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
294 .await;
295
296 let reader_builder = RowGroupReaderBuilder {
297 file_handle: self.file_handle.clone(),
298 file_path,
299 parquet_meta,
300 object_store: self.object_store.clone(),
301 projection: projection_mask,
302 field_levels,
303 cache_strategy: self.cache_strategy.clone(),
304 };
305
306 let filters = if let Some(predicate) = &self.predicate {
307 predicate
308 .exprs()
309 .iter()
310 .filter_map(|expr| {
311 SimpleFilterContext::new_opt(
312 ®ion_meta,
313 self.expected_metadata.as_deref(),
314 expr,
315 )
316 })
317 .collect::<Vec<_>>()
318 } else {
319 vec![]
320 };
321
322 let codec = build_primary_key_codec(read_format.metadata());
323
324 let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
325
326 metrics.build_cost += start.elapsed();
327
328 Ok((context, selection))
329 }
330
331 fn get_region_metadata(
333 file_path: &str,
334 key_value_meta: Option<&Vec<KeyValue>>,
335 ) -> Result<RegionMetadata> {
336 let key_values = key_value_meta.context(InvalidParquetSnafu {
337 file: file_path,
338 reason: "missing key value meta",
339 })?;
340 let meta_value = key_values
341 .iter()
342 .find(|kv| kv.key == PARQUET_METADATA_KEY)
343 .with_context(|| InvalidParquetSnafu {
344 file: file_path,
345 reason: format!("key {} not found", PARQUET_METADATA_KEY),
346 })?;
347 let json = meta_value
348 .value
349 .as_ref()
350 .with_context(|| InvalidParquetSnafu {
351 file: file_path,
352 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
353 })?;
354
355 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
356 }
357
358 async fn read_parquet_metadata(
360 &self,
361 file_path: &str,
362 file_size: u64,
363 ) -> Result<Arc<ParquetMetaData>> {
364 let _t = READ_STAGE_ELAPSED
365 .with_label_values(&["read_parquet_metadata"])
366 .start_timer();
367
368 let file_id = self.file_handle.file_id();
369 if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
371 return Ok(metadata);
372 }
373
374 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
376 let metadata = metadata_loader.load().await?;
377 let metadata = Arc::new(metadata);
378 self.cache_strategy
380 .put_parquet_meta_data(file_id, metadata.clone());
381
382 Ok(metadata)
383 }
384
385 async fn row_groups_to_read(
387 &self,
388 read_format: &ReadFormat,
389 parquet_meta: &ParquetMetaData,
390 metrics: &mut ReaderFilterMetrics,
391 ) -> RowGroupSelection {
392 let num_row_groups = parquet_meta.num_row_groups();
393 let num_rows = parquet_meta.file_metadata().num_rows();
394 if num_row_groups == 0 || num_rows == 0 {
395 return RowGroupSelection::default();
396 }
397
398 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
401 if row_group_size == 0 {
402 return RowGroupSelection::default();
403 }
404
405 metrics.rg_total += num_row_groups;
406 metrics.rows_total += num_rows as usize;
407
408 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
409
410 self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
411 if output.is_empty() {
412 return output;
413 }
414
415 let fulltext_filtered = self
416 .prune_row_groups_by_fulltext_index(
417 row_group_size,
418 num_row_groups,
419 &mut output,
420 metrics,
421 )
422 .await;
423 if output.is_empty() {
424 return output;
425 }
426
427 self.prune_row_groups_by_inverted_index(
428 row_group_size,
429 num_row_groups,
430 &mut output,
431 metrics,
432 )
433 .await;
434 if output.is_empty() {
435 return output;
436 }
437
438 self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
439 .await;
440 if output.is_empty() {
441 return output;
442 }
443
444 if !fulltext_filtered {
445 self.prune_row_groups_by_fulltext_bloom(
446 row_group_size,
447 parquet_meta,
448 &mut output,
449 metrics,
450 )
451 .await;
452 }
453 output
454 }
455
456 async fn prune_row_groups_by_fulltext_index(
458 &self,
459 row_group_size: usize,
460 num_row_groups: usize,
461 output: &mut RowGroupSelection,
462 metrics: &mut ReaderFilterMetrics,
463 ) -> bool {
464 let Some(index_applier) = &self.fulltext_index_applier else {
465 return false;
466 };
467 if !self.file_handle.meta_ref().fulltext_index_available() {
468 return false;
469 }
470
471 let predicate_key = index_applier.predicate_key();
472 let cached = self
474 .cache_strategy
475 .index_result_cache()
476 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
477 if let Some(result) = cached.as_ref()
478 && all_required_row_groups_searched(output, result)
479 {
480 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
481 return true;
482 }
483
484 let file_size_hint = self.file_handle.meta_ref().index_file_size();
486 let apply_res = index_applier
487 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
488 .await;
489 let selection = match apply_res {
490 Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
491 Ok(None) => return false,
492 Err(err) => {
493 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
494 return false;
495 }
496 };
497
498 self.apply_index_result_and_update_cache(
499 predicate_key,
500 self.file_handle.file_id().file_id(),
501 selection,
502 output,
503 metrics,
504 INDEX_TYPE_FULLTEXT,
505 );
506 true
507 }
508
509 async fn prune_row_groups_by_inverted_index(
515 &self,
516 row_group_size: usize,
517 num_row_groups: usize,
518 output: &mut RowGroupSelection,
519 metrics: &mut ReaderFilterMetrics,
520 ) -> bool {
521 let Some(index_applier) = &self.inverted_index_applier else {
522 return false;
523 };
524 if !self.file_handle.meta_ref().inverted_index_available() {
525 return false;
526 }
527
528 let predicate_key = index_applier.predicate_key();
529 let cached = self
531 .cache_strategy
532 .index_result_cache()
533 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
534 if let Some(result) = cached.as_ref()
535 && all_required_row_groups_searched(output, result)
536 {
537 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
538 return true;
539 }
540
541 let file_size_hint = self.file_handle.meta_ref().index_file_size();
543 let apply_res = index_applier
544 .apply(self.file_handle.file_id(), Some(file_size_hint))
545 .await;
546 let selection = match apply_res {
547 Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
548 row_group_size,
549 num_row_groups,
550 output,
551 ),
552 Err(err) => {
553 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
554 return false;
555 }
556 };
557
558 self.apply_index_result_and_update_cache(
559 predicate_key,
560 self.file_handle.file_id().file_id(),
561 selection,
562 output,
563 metrics,
564 INDEX_TYPE_INVERTED,
565 );
566 true
567 }
568
569 async fn prune_row_groups_by_bloom_filter(
570 &self,
571 row_group_size: usize,
572 parquet_meta: &ParquetMetaData,
573 output: &mut RowGroupSelection,
574 metrics: &mut ReaderFilterMetrics,
575 ) -> bool {
576 let Some(index_applier) = &self.bloom_filter_index_applier else {
577 return false;
578 };
579 if !self.file_handle.meta_ref().bloom_filter_index_available() {
580 return false;
581 }
582
583 let predicate_key = index_applier.predicate_key();
584 let cached = self
586 .cache_strategy
587 .index_result_cache()
588 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
589 if let Some(result) = cached.as_ref()
590 && all_required_row_groups_searched(output, result)
591 {
592 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
593 return true;
594 }
595
596 let file_size_hint = self.file_handle.meta_ref().index_file_size();
598 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
599 (
600 rg.num_rows() as usize,
601 output.contains_non_empty_row_group(i)
603 && cached
604 .as_ref()
605 .map(|c| !c.contains_row_group(i))
606 .unwrap_or(true),
607 )
608 });
609 let apply_res = index_applier
610 .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
611 .await;
612 let mut selection = match apply_res {
613 Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
614 Err(err) => {
615 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
616 return false;
617 }
618 };
619
620 if let Some(cached) = cached.as_ref() {
622 selection.concat(cached);
623 }
624
625 self.apply_index_result_and_update_cache(
626 predicate_key,
627 self.file_handle.file_id().file_id(),
628 selection,
629 output,
630 metrics,
631 INDEX_TYPE_BLOOM,
632 );
633 true
634 }
635
636 async fn prune_row_groups_by_fulltext_bloom(
637 &self,
638 row_group_size: usize,
639 parquet_meta: &ParquetMetaData,
640 output: &mut RowGroupSelection,
641 metrics: &mut ReaderFilterMetrics,
642 ) -> bool {
643 let Some(index_applier) = &self.fulltext_index_applier else {
644 return false;
645 };
646 if !self.file_handle.meta_ref().fulltext_index_available() {
647 return false;
648 }
649
650 let predicate_key = index_applier.predicate_key();
651 let cached = self
653 .cache_strategy
654 .index_result_cache()
655 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
656 if let Some(result) = cached.as_ref()
657 && all_required_row_groups_searched(output, result)
658 {
659 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
660 return true;
661 }
662
663 let file_size_hint = self.file_handle.meta_ref().index_file_size();
665 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
666 (
667 rg.num_rows() as usize,
668 output.contains_non_empty_row_group(i)
670 && cached
671 .as_ref()
672 .map(|c| !c.contains_row_group(i))
673 .unwrap_or(true),
674 )
675 });
676 let apply_res = index_applier
677 .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
678 .await;
679 let mut selection = match apply_res {
680 Ok(Some(apply_output)) => {
681 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
682 }
683 Ok(None) => return false,
684 Err(err) => {
685 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
686 return false;
687 }
688 };
689
690 if let Some(cached) = cached.as_ref() {
692 selection.concat(cached);
693 }
694
695 self.apply_index_result_and_update_cache(
696 predicate_key,
697 self.file_handle.file_id().file_id(),
698 selection,
699 output,
700 metrics,
701 INDEX_TYPE_FULLTEXT,
702 );
703 true
704 }
705
706 fn prune_row_groups_by_minmax(
708 &self,
709 read_format: &ReadFormat,
710 parquet_meta: &ParquetMetaData,
711 output: &mut RowGroupSelection,
712 metrics: &mut ReaderFilterMetrics,
713 ) -> bool {
714 let Some(predicate) = &self.predicate else {
715 return false;
716 };
717
718 let row_groups_before = output.row_group_count();
719
720 let region_meta = read_format.metadata();
721 let row_groups = parquet_meta.row_groups();
722 let stats =
723 RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
724 let prune_schema = self
725 .expected_metadata
726 .as_ref()
727 .map(|meta| meta.schema.arrow_schema())
728 .unwrap_or_else(|| region_meta.schema.arrow_schema());
729
730 predicate
734 .prune_with_stats(&stats, prune_schema)
735 .iter()
736 .zip(0..parquet_meta.num_row_groups())
737 .for_each(|(mask, row_group)| {
738 if !*mask {
739 output.remove_row_group(row_group);
740 }
741 });
742
743 let row_groups_after = output.row_group_count();
744 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
745
746 true
747 }
748
749 fn apply_index_result_and_update_cache(
750 &self,
751 predicate_key: &PredicateKey,
752 file_id: FileId,
753 result: RowGroupSelection,
754 output: &mut RowGroupSelection,
755 metrics: &mut ReaderFilterMetrics,
756 index_type: &str,
757 ) {
758 apply_selection_and_update_metrics(output, &result, metrics, index_type);
759
760 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
761 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
762 }
763 }
764}
765
766fn apply_selection_and_update_metrics(
767 output: &mut RowGroupSelection,
768 result: &RowGroupSelection,
769 metrics: &mut ReaderFilterMetrics,
770 index_type: &str,
771) {
772 let intersection = output.intersect(result);
773
774 let row_group_count = output.row_group_count() - intersection.row_group_count();
775 let row_count = output.row_count() - intersection.row_count();
776
777 metrics.update_index_metrics(index_type, row_group_count, row_count);
778
779 *output = intersection;
780}
781
782fn all_required_row_groups_searched(
783 required_row_groups: &RowGroupSelection,
784 cached_row_groups: &RowGroupSelection,
785) -> bool {
786 required_row_groups.iter().all(|(rg_id, _)| {
787 !required_row_groups.contains_non_empty_row_group(*rg_id)
789 || cached_row_groups.contains_row_group(*rg_id)
791 })
792}
793
794#[derive(Debug, Default, Clone, Copy)]
796pub(crate) struct ReaderFilterMetrics {
797 pub(crate) rg_total: usize,
799 pub(crate) rg_fulltext_filtered: usize,
801 pub(crate) rg_inverted_filtered: usize,
803 pub(crate) rg_minmax_filtered: usize,
805 pub(crate) rg_bloom_filtered: usize,
807
808 pub(crate) rows_total: usize,
810 pub(crate) rows_fulltext_filtered: usize,
812 pub(crate) rows_inverted_filtered: usize,
814 pub(crate) rows_bloom_filtered: usize,
816 pub(crate) rows_precise_filtered: usize,
818}
819
820impl ReaderFilterMetrics {
821 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
823 self.rg_total += other.rg_total;
824 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
825 self.rg_inverted_filtered += other.rg_inverted_filtered;
826 self.rg_minmax_filtered += other.rg_minmax_filtered;
827 self.rg_bloom_filtered += other.rg_bloom_filtered;
828
829 self.rows_total += other.rows_total;
830 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
831 self.rows_inverted_filtered += other.rows_inverted_filtered;
832 self.rows_bloom_filtered += other.rows_bloom_filtered;
833 self.rows_precise_filtered += other.rows_precise_filtered;
834 }
835
836 pub(crate) fn observe(&self) {
838 READ_ROW_GROUPS_TOTAL
839 .with_label_values(&["before_filtering"])
840 .inc_by(self.rg_total as u64);
841 READ_ROW_GROUPS_TOTAL
842 .with_label_values(&["fulltext_index_filtered"])
843 .inc_by(self.rg_fulltext_filtered as u64);
844 READ_ROW_GROUPS_TOTAL
845 .with_label_values(&["inverted_index_filtered"])
846 .inc_by(self.rg_inverted_filtered as u64);
847 READ_ROW_GROUPS_TOTAL
848 .with_label_values(&["minmax_index_filtered"])
849 .inc_by(self.rg_minmax_filtered as u64);
850 READ_ROW_GROUPS_TOTAL
851 .with_label_values(&["bloom_filter_index_filtered"])
852 .inc_by(self.rg_bloom_filtered as u64);
853
854 PRECISE_FILTER_ROWS_TOTAL
855 .with_label_values(&["parquet"])
856 .inc_by(self.rows_precise_filtered as u64);
857 READ_ROWS_IN_ROW_GROUP_TOTAL
858 .with_label_values(&["before_filtering"])
859 .inc_by(self.rows_total as u64);
860 READ_ROWS_IN_ROW_GROUP_TOTAL
861 .with_label_values(&["fulltext_index_filtered"])
862 .inc_by(self.rows_fulltext_filtered as u64);
863 READ_ROWS_IN_ROW_GROUP_TOTAL
864 .with_label_values(&["inverted_index_filtered"])
865 .inc_by(self.rows_inverted_filtered as u64);
866 READ_ROWS_IN_ROW_GROUP_TOTAL
867 .with_label_values(&["bloom_filter_index_filtered"])
868 .inc_by(self.rows_bloom_filtered as u64);
869 }
870
871 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
872 match index_type {
873 INDEX_TYPE_FULLTEXT => {
874 self.rg_fulltext_filtered += row_group_count;
875 self.rows_fulltext_filtered += row_count;
876 }
877 INDEX_TYPE_INVERTED => {
878 self.rg_inverted_filtered += row_group_count;
879 self.rows_inverted_filtered += row_count;
880 }
881 INDEX_TYPE_BLOOM => {
882 self.rg_bloom_filtered += row_group_count;
883 self.rows_bloom_filtered += row_count;
884 }
885 _ => {}
886 }
887 }
888}
889
890#[derive(Debug, Default, Clone)]
892pub struct ReaderMetrics {
893 pub(crate) filter_metrics: ReaderFilterMetrics,
895 pub(crate) build_cost: Duration,
897 pub(crate) scan_cost: Duration,
899 pub(crate) num_record_batches: usize,
901 pub(crate) num_batches: usize,
903 pub(crate) num_rows: usize,
905}
906
907impl ReaderMetrics {
908 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
910 self.filter_metrics.merge_from(&other.filter_metrics);
911 self.build_cost += other.build_cost;
912 self.scan_cost += other.scan_cost;
913 self.num_record_batches += other.num_record_batches;
914 self.num_batches += other.num_batches;
915 self.num_rows += other.num_rows;
916 }
917
918 pub(crate) fn observe_rows(&self, read_type: &str) {
920 READ_ROWS_TOTAL
921 .with_label_values(&[read_type])
922 .inc_by(self.num_rows as u64);
923 }
924}
925
926pub(crate) struct RowGroupReaderBuilder {
928 file_handle: FileHandle,
932 file_path: String,
934 parquet_meta: Arc<ParquetMetaData>,
936 object_store: ObjectStore,
938 projection: ProjectionMask,
940 field_levels: FieldLevels,
942 cache_strategy: CacheStrategy,
944}
945
946impl RowGroupReaderBuilder {
947 pub(crate) fn file_path(&self) -> &str {
949 &self.file_path
950 }
951
952 pub(crate) fn file_handle(&self) -> &FileHandle {
954 &self.file_handle
955 }
956
957 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
958 &self.parquet_meta
959 }
960
961 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
962 &self.cache_strategy
963 }
964
965 pub(crate) async fn build(
967 &self,
968 row_group_idx: usize,
969 row_selection: Option<RowSelection>,
970 ) -> Result<ParquetRecordBatchReader> {
971 let mut row_group = InMemoryRowGroup::create(
972 self.file_handle.region_id(),
973 self.file_handle.file_id().file_id(),
974 &self.parquet_meta,
975 row_group_idx,
976 self.cache_strategy.clone(),
977 &self.file_path,
978 self.object_store.clone(),
979 );
980 row_group
982 .fetch(&self.projection, row_selection.as_ref())
983 .await
984 .context(ReadParquetSnafu {
985 path: &self.file_path,
986 })?;
987
988 ParquetRecordBatchReader::try_new_with_row_groups(
991 &self.field_levels,
992 &row_group,
993 DEFAULT_READ_BATCH_SIZE,
994 row_selection,
995 )
996 .context(ReadParquetSnafu {
997 path: &self.file_path,
998 })
999 }
1000}
1001
1002enum ReaderState {
1004 Readable(PruneReader),
1006 Exhausted(ReaderMetrics),
1008}
1009
1010impl ReaderState {
1011 fn metrics(&self) -> ReaderMetrics {
1013 match self {
1014 ReaderState::Readable(reader) => reader.metrics(),
1015 ReaderState::Exhausted(m) => m.clone(),
1016 }
1017 }
1018}
1019
1020pub(crate) enum MaybeFilter {
1022 Filter(SimpleFilterEvaluator),
1024 Matched,
1026 Pruned,
1028}
1029
1030pub(crate) struct SimpleFilterContext {
1032 filter: MaybeFilter,
1034 column_id: ColumnId,
1036 semantic_type: SemanticType,
1038 data_type: ConcreteDataType,
1040}
1041
1042impl SimpleFilterContext {
1043 pub(crate) fn new_opt(
1048 sst_meta: &RegionMetadataRef,
1049 expected_meta: Option<&RegionMetadata>,
1050 expr: &Expr,
1051 ) -> Option<Self> {
1052 let filter = SimpleFilterEvaluator::try_new(expr)?;
1053 let (column_metadata, maybe_filter) = match expected_meta {
1054 Some(meta) => {
1055 let column = meta.column_by_name(filter.column_name())?;
1057 match sst_meta.column_by_id(column.column_id) {
1060 Some(sst_column) => {
1061 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1062
1063 (column, MaybeFilter::Filter(filter))
1064 }
1065 None => {
1066 if pruned_by_default(&filter, column)? {
1070 (column, MaybeFilter::Pruned)
1071 } else {
1072 (column, MaybeFilter::Matched)
1073 }
1074 }
1075 }
1076 }
1077 None => {
1078 let column = sst_meta.column_by_name(filter.column_name())?;
1079 (column, MaybeFilter::Filter(filter))
1080 }
1081 };
1082
1083 Some(Self {
1084 filter: maybe_filter,
1085 column_id: column_metadata.column_id,
1086 semantic_type: column_metadata.semantic_type,
1087 data_type: column_metadata.column_schema.data_type.clone(),
1088 })
1089 }
1090
1091 pub(crate) fn filter(&self) -> &MaybeFilter {
1093 &self.filter
1094 }
1095
1096 pub(crate) fn column_id(&self) -> ColumnId {
1098 self.column_id
1099 }
1100
1101 pub(crate) fn semantic_type(&self) -> SemanticType {
1103 self.semantic_type
1104 }
1105
1106 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1108 &self.data_type
1109 }
1110}
1111
1112fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1115 let value = column.column_schema.create_default().ok().flatten()?;
1116 let scalar_value = value
1117 .try_to_scalar_value(&column.column_schema.data_type)
1118 .ok()?;
1119 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1120 Some(!matches)
1121}
1122
1123pub struct ParquetReader {
1125 context: FileRangeContextRef,
1127 selection: RowGroupSelection,
1129 reader_state: ReaderState,
1131}
1132
1133#[async_trait]
1134impl BatchReader for ParquetReader {
1135 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1136 let ReaderState::Readable(reader) = &mut self.reader_state else {
1137 return Ok(None);
1138 };
1139
1140 if let Some(batch) = reader.next_batch().await? {
1142 return Ok(Some(batch));
1143 }
1144
1145 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1147 let parquet_reader = self
1148 .context
1149 .reader_builder()
1150 .build(row_group_idx, Some(row_selection))
1151 .await?;
1152
1153 reader.reset_source(Source::RowGroup(RowGroupReader::new(
1155 self.context.clone(),
1156 parquet_reader,
1157 )));
1158 if let Some(batch) = reader.next_batch().await? {
1159 return Ok(Some(batch));
1160 }
1161 }
1162
1163 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1165 Ok(None)
1166 }
1167}
1168
1169impl Drop for ParquetReader {
1170 fn drop(&mut self) {
1171 let metrics = self.reader_state.metrics();
1172 debug!(
1173 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1174 self.context.reader_builder().file_handle.region_id(),
1175 self.context.reader_builder().file_handle.file_id(),
1176 self.context.reader_builder().file_handle.time_range(),
1177 metrics.filter_metrics.rg_total
1178 - metrics.filter_metrics.rg_inverted_filtered
1179 - metrics.filter_metrics.rg_minmax_filtered
1180 - metrics.filter_metrics.rg_fulltext_filtered
1181 - metrics.filter_metrics.rg_bloom_filtered,
1182 metrics.filter_metrics.rg_total,
1183 metrics
1184 );
1185
1186 READ_STAGE_ELAPSED
1188 .with_label_values(&["build_parquet_reader"])
1189 .observe(metrics.build_cost.as_secs_f64());
1190 READ_STAGE_ELAPSED
1191 .with_label_values(&["scan_row_groups"])
1192 .observe(metrics.scan_cost.as_secs_f64());
1193 metrics.observe_rows("parquet_reader");
1194 metrics.filter_metrics.observe();
1195 }
1196}
1197
1198impl ParquetReader {
1199 pub(crate) async fn new(
1201 context: FileRangeContextRef,
1202 mut selection: RowGroupSelection,
1203 ) -> Result<Self> {
1204 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1206 let parquet_reader = context
1207 .reader_builder()
1208 .build(row_group_idx, Some(row_selection))
1209 .await?;
1210 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1211 context.clone(),
1212 RowGroupReader::new(context.clone(), parquet_reader),
1213 ))
1214 } else {
1215 ReaderState::Exhausted(ReaderMetrics::default())
1216 };
1217
1218 Ok(ParquetReader {
1219 context,
1220 selection,
1221 reader_state,
1222 })
1223 }
1224
1225 pub fn metadata(&self) -> &RegionMetadataRef {
1227 self.context.read_format().metadata()
1228 }
1229
1230 #[cfg(test)]
1231 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1232 self.context.reader_builder().parquet_meta.clone()
1233 }
1234}
1235
1236pub(crate) trait RowGroupReaderContext: Send {
1239 fn map_result(
1240 &self,
1241 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1242 ) -> Result<Option<RecordBatch>>;
1243
1244 fn read_format(&self) -> &ReadFormat;
1245}
1246
1247impl RowGroupReaderContext for FileRangeContextRef {
1248 fn map_result(
1249 &self,
1250 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1251 ) -> Result<Option<RecordBatch>> {
1252 result.context(ArrowReaderSnafu {
1253 path: self.file_path(),
1254 })
1255 }
1256
1257 fn read_format(&self) -> &ReadFormat {
1258 self.as_ref().read_format()
1259 }
1260}
1261
1262pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1264
1265impl RowGroupReader {
1266 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1268 Self::create(context, reader)
1269 }
1270}
1271
1272pub(crate) struct RowGroupReaderBase<T> {
1274 context: T,
1276 reader: ParquetRecordBatchReader,
1278 batches: VecDeque<Batch>,
1280 metrics: ReaderMetrics,
1282 override_sequence: Option<ArrayRef>,
1284}
1285
1286impl<T> RowGroupReaderBase<T>
1287where
1288 T: RowGroupReaderContext,
1289{
1290 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1292 let override_sequence = context
1294 .read_format()
1295 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1296 assert!(context.read_format().as_primary_key().is_some());
1297
1298 Self {
1299 context,
1300 reader,
1301 batches: VecDeque::new(),
1302 metrics: ReaderMetrics::default(),
1303 override_sequence,
1304 }
1305 }
1306
1307 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1309 &self.metrics
1310 }
1311
1312 pub(crate) fn read_format(&self) -> &ReadFormat {
1314 self.context.read_format()
1315 }
1316
1317 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1319 self.context.map_result(self.reader.next().transpose())
1320 }
1321
1322 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1324 let scan_start = Instant::now();
1325 if let Some(batch) = self.batches.pop_front() {
1326 self.metrics.num_rows += batch.num_rows();
1327 self.metrics.scan_cost += scan_start.elapsed();
1328 return Ok(Some(batch));
1329 }
1330
1331 while self.batches.is_empty() {
1333 let Some(record_batch) = self.fetch_next_record_batch()? else {
1334 self.metrics.scan_cost += scan_start.elapsed();
1335 return Ok(None);
1336 };
1337 self.metrics.num_record_batches += 1;
1338
1339 self.context
1341 .read_format()
1342 .as_primary_key()
1343 .unwrap()
1344 .convert_record_batch(
1345 &record_batch,
1346 self.override_sequence.as_ref(),
1347 &mut self.batches,
1348 )?;
1349 self.metrics.num_batches += self.batches.len();
1350 }
1351 let batch = self.batches.pop_front();
1352 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1353 self.metrics.scan_cost += scan_start.elapsed();
1354 Ok(batch)
1355 }
1356}
1357
1358#[async_trait::async_trait]
1359impl<T> BatchReader for RowGroupReaderBase<T>
1360where
1361 T: RowGroupReaderContext,
1362{
1363 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1364 self.next_inner()
1365 }
1366}
1367
1368pub(crate) struct FlatRowGroupReader {
1370 context: FileRangeContextRef,
1372 reader: ParquetRecordBatchReader,
1374 override_sequence: Option<ArrayRef>,
1376}
1377
1378impl FlatRowGroupReader {
1379 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1381 let override_sequence = context
1383 .read_format()
1384 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1385
1386 Self {
1387 context,
1388 reader,
1389 override_sequence,
1390 }
1391 }
1392
1393 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1395 match self.reader.next() {
1396 Some(batch_result) => {
1397 let record_batch = batch_result.context(ArrowReaderSnafu {
1398 path: self.context.file_path(),
1399 })?;
1400 let flat_format = self.context.read_format().as_flat().unwrap();
1402 let record_batch =
1403 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1404 Ok(Some(record_batch))
1405 }
1406 None => Ok(None),
1407 }
1408 }
1409}