1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::ColumnId;
40use table::predicate::Predicate;
41
42use crate::cache::index::result_cache::PredicateKey;
43use crate::cache::CacheStrategy;
44use crate::error::{
45 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46 ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49 PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
50 READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::{FileHandle, FileId};
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
59use crate::sst::parquet::format::{need_override_sequence, ReadFormat};
60use crate::sst::parquet::metadata::MetadataLoader;
61use crate::sst::parquet::row_group::InMemoryRowGroup;
62use crate::sst::parquet::row_selection::RowGroupSelection;
63use crate::sst::parquet::stats::RowGroupPruningStats;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
65
66const INDEX_TYPE_FULLTEXT: &str = "fulltext";
67const INDEX_TYPE_INVERTED: &str = "inverted";
68const INDEX_TYPE_BLOOM: &str = "bloom filter";
69
70macro_rules! handle_index_error {
71 ($err:expr, $file_handle:expr, $index_type:expr) => {
72 if cfg!(any(test, feature = "test")) {
73 panic!(
74 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
75 $index_type,
76 $file_handle.region_id(),
77 $file_handle.file_id(),
78 $err
79 );
80 } else {
81 warn!(
82 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
83 $index_type,
84 $file_handle.region_id(),
85 $file_handle.file_id()
86 );
87 }
88 };
89}
90
91pub struct ParquetReaderBuilder {
93 file_dir: String,
95 path_type: PathType,
97 file_handle: FileHandle,
98 object_store: ObjectStore,
99 predicate: Option<Predicate>,
101 projection: Option<Vec<ColumnId>>,
106 cache_strategy: CacheStrategy,
108 inverted_index_applier: Option<InvertedIndexApplierRef>,
110 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
111 fulltext_index_applier: Option<FulltextIndexApplierRef>,
112 expected_metadata: Option<RegionMetadataRef>,
116 flat_format: bool,
118}
119
120impl ParquetReaderBuilder {
121 pub fn new(
123 file_dir: String,
124 path_type: PathType,
125 file_handle: FileHandle,
126 object_store: ObjectStore,
127 ) -> ParquetReaderBuilder {
128 ParquetReaderBuilder {
129 file_dir,
130 path_type,
131 file_handle,
132 object_store,
133 predicate: None,
134 projection: None,
135 cache_strategy: CacheStrategy::Disabled,
136 inverted_index_applier: None,
137 bloom_filter_index_applier: None,
138 fulltext_index_applier: None,
139 expected_metadata: None,
140 flat_format: false,
141 }
142 }
143
144 #[must_use]
146 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
147 self.predicate = predicate;
148 self
149 }
150
151 #[must_use]
155 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
156 self.projection = projection;
157 self
158 }
159
160 #[must_use]
162 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
163 self.cache_strategy = cache;
164 self
165 }
166
167 #[must_use]
169 pub(crate) fn inverted_index_applier(
170 mut self,
171 index_applier: Option<InvertedIndexApplierRef>,
172 ) -> Self {
173 self.inverted_index_applier = index_applier;
174 self
175 }
176
177 #[must_use]
179 pub(crate) fn bloom_filter_index_applier(
180 mut self,
181 index_applier: Option<BloomFilterIndexApplierRef>,
182 ) -> Self {
183 self.bloom_filter_index_applier = index_applier;
184 self
185 }
186
187 #[must_use]
189 pub(crate) fn fulltext_index_applier(
190 mut self,
191 index_applier: Option<FulltextIndexApplierRef>,
192 ) -> Self {
193 self.fulltext_index_applier = index_applier;
194 self
195 }
196
197 #[must_use]
199 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
200 self.expected_metadata = expected_metadata;
201 self
202 }
203
204 #[must_use]
206 pub fn flat_format(mut self, flat_format: bool) -> Self {
207 self.flat_format = flat_format;
208 self
209 }
210
211 pub async fn build(&self) -> Result<ParquetReader> {
215 let mut metrics = ReaderMetrics::default();
216
217 let (context, selection) = self.build_reader_input(&mut metrics).await?;
218 ParquetReader::new(Arc::new(context), selection).await
219 }
220
221 pub(crate) async fn build_reader_input(
225 &self,
226 metrics: &mut ReaderMetrics,
227 ) -> Result<(FileRangeContext, RowGroupSelection)> {
228 let start = Instant::now();
229
230 let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
231 let file_size = self.file_handle.meta_ref().file_size;
232
233 let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
235 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
237 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
239 let mut read_format = if let Some(column_ids) = &self.projection {
240 ReadFormat::new(
241 region_meta.clone(),
242 column_ids.iter().copied(),
243 self.flat_format,
244 )
245 } else {
246 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
248 ReadFormat::new(
249 region_meta.clone(),
250 expected_meta
251 .column_metadatas
252 .iter()
253 .map(|col| col.column_id),
254 self.flat_format,
255 )
256 };
257 if need_override_sequence(&parquet_meta) {
258 read_format
259 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
260 }
261
262 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
264 let indices = read_format.projection_indices();
265 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
268
269 let hint = Some(read_format.arrow_schema().fields());
271 let field_levels =
272 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
273 .context(ReadDataPartSnafu)?;
274 let selection = self
275 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
276 .await;
277
278 let reader_builder = RowGroupReaderBuilder {
279 file_handle: self.file_handle.clone(),
280 file_path,
281 parquet_meta,
282 object_store: self.object_store.clone(),
283 projection: projection_mask,
284 field_levels,
285 cache_strategy: self.cache_strategy.clone(),
286 };
287
288 let filters = if let Some(predicate) = &self.predicate {
289 predicate
290 .exprs()
291 .iter()
292 .filter_map(|expr| {
293 SimpleFilterContext::new_opt(
294 ®ion_meta,
295 self.expected_metadata.as_deref(),
296 expr,
297 )
298 })
299 .collect::<Vec<_>>()
300 } else {
301 vec![]
302 };
303
304 let codec = build_primary_key_codec(read_format.metadata());
305
306 let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
307
308 metrics.build_cost += start.elapsed();
309
310 Ok((context, selection))
311 }
312
313 fn get_region_metadata(
315 file_path: &str,
316 key_value_meta: Option<&Vec<KeyValue>>,
317 ) -> Result<RegionMetadata> {
318 let key_values = key_value_meta.context(InvalidParquetSnafu {
319 file: file_path,
320 reason: "missing key value meta",
321 })?;
322 let meta_value = key_values
323 .iter()
324 .find(|kv| kv.key == PARQUET_METADATA_KEY)
325 .with_context(|| InvalidParquetSnafu {
326 file: file_path,
327 reason: format!("key {} not found", PARQUET_METADATA_KEY),
328 })?;
329 let json = meta_value
330 .value
331 .as_ref()
332 .with_context(|| InvalidParquetSnafu {
333 file: file_path,
334 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
335 })?;
336
337 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
338 }
339
340 async fn read_parquet_metadata(
342 &self,
343 file_path: &str,
344 file_size: u64,
345 ) -> Result<Arc<ParquetMetaData>> {
346 let _t = READ_STAGE_ELAPSED
347 .with_label_values(&["read_parquet_metadata"])
348 .start_timer();
349
350 let file_id = self.file_handle.file_id();
351 if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
353 return Ok(metadata);
354 }
355
356 let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
358 let metadata = metadata_loader.load().await?;
359 let metadata = Arc::new(metadata);
360 self.cache_strategy
362 .put_parquet_meta_data(file_id, metadata.clone());
363
364 Ok(metadata)
365 }
366
367 async fn row_groups_to_read(
369 &self,
370 read_format: &ReadFormat,
371 parquet_meta: &ParquetMetaData,
372 metrics: &mut ReaderFilterMetrics,
373 ) -> RowGroupSelection {
374 let num_row_groups = parquet_meta.num_row_groups();
375 let num_rows = parquet_meta.file_metadata().num_rows();
376 if num_row_groups == 0 || num_rows == 0 {
377 return RowGroupSelection::default();
378 }
379
380 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
383 if row_group_size == 0 {
384 return RowGroupSelection::default();
385 }
386
387 metrics.rg_total += num_row_groups;
388 metrics.rows_total += num_rows as usize;
389
390 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
391
392 self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
393 if output.is_empty() {
394 return output;
395 }
396
397 let fulltext_filtered = self
398 .prune_row_groups_by_fulltext_index(
399 row_group_size,
400 num_row_groups,
401 &mut output,
402 metrics,
403 )
404 .await;
405 if output.is_empty() {
406 return output;
407 }
408
409 self.prune_row_groups_by_inverted_index(
410 row_group_size,
411 num_row_groups,
412 &mut output,
413 metrics,
414 )
415 .await;
416 if output.is_empty() {
417 return output;
418 }
419
420 self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
421 .await;
422 if output.is_empty() {
423 return output;
424 }
425
426 if !fulltext_filtered {
427 self.prune_row_groups_by_fulltext_bloom(
428 row_group_size,
429 parquet_meta,
430 &mut output,
431 metrics,
432 )
433 .await;
434 }
435 output
436 }
437
438 async fn prune_row_groups_by_fulltext_index(
440 &self,
441 row_group_size: usize,
442 num_row_groups: usize,
443 output: &mut RowGroupSelection,
444 metrics: &mut ReaderFilterMetrics,
445 ) -> bool {
446 let Some(index_applier) = &self.fulltext_index_applier else {
447 return false;
448 };
449 if !self.file_handle.meta_ref().fulltext_index_available() {
450 return false;
451 }
452
453 let predicate_key = index_applier.predicate_key();
454 let cached = self
456 .cache_strategy
457 .index_result_cache()
458 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
459 if let Some(result) = cached.as_ref() {
460 if all_required_row_groups_searched(output, result) {
461 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
462 return true;
463 }
464 }
465
466 let file_size_hint = self.file_handle.meta_ref().index_file_size();
468 let apply_res = index_applier
469 .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
470 .await;
471 let selection = match apply_res {
472 Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
473 Ok(None) => return false,
474 Err(err) => {
475 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
476 return false;
477 }
478 };
479
480 self.apply_index_result_and_update_cache(
481 predicate_key,
482 self.file_handle.file_id().file_id(),
483 selection,
484 output,
485 metrics,
486 INDEX_TYPE_FULLTEXT,
487 );
488 true
489 }
490
491 async fn prune_row_groups_by_inverted_index(
497 &self,
498 row_group_size: usize,
499 num_row_groups: usize,
500 output: &mut RowGroupSelection,
501 metrics: &mut ReaderFilterMetrics,
502 ) -> bool {
503 let Some(index_applier) = &self.inverted_index_applier else {
504 return false;
505 };
506 if !self.file_handle.meta_ref().inverted_index_available() {
507 return false;
508 }
509
510 let predicate_key = index_applier.predicate_key();
511 let cached = self
513 .cache_strategy
514 .index_result_cache()
515 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
516 if let Some(result) = cached.as_ref() {
517 if all_required_row_groups_searched(output, result) {
518 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
519 return true;
520 }
521 }
522
523 let file_size_hint = self.file_handle.meta_ref().index_file_size();
525 let apply_res = index_applier
526 .apply(self.file_handle.file_id(), Some(file_size_hint))
527 .await;
528 let selection = match apply_res {
529 Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
530 row_group_size,
531 num_row_groups,
532 output,
533 ),
534 Err(err) => {
535 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
536 return false;
537 }
538 };
539
540 self.apply_index_result_and_update_cache(
541 predicate_key,
542 self.file_handle.file_id().file_id(),
543 selection,
544 output,
545 metrics,
546 INDEX_TYPE_INVERTED,
547 );
548 true
549 }
550
551 async fn prune_row_groups_by_bloom_filter(
552 &self,
553 row_group_size: usize,
554 parquet_meta: &ParquetMetaData,
555 output: &mut RowGroupSelection,
556 metrics: &mut ReaderFilterMetrics,
557 ) -> bool {
558 let Some(index_applier) = &self.bloom_filter_index_applier else {
559 return false;
560 };
561 if !self.file_handle.meta_ref().bloom_filter_index_available() {
562 return false;
563 }
564
565 let predicate_key = index_applier.predicate_key();
566 let cached = self
568 .cache_strategy
569 .index_result_cache()
570 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
571 if let Some(result) = cached.as_ref() {
572 if all_required_row_groups_searched(output, result) {
573 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
574 return true;
575 }
576 }
577
578 let file_size_hint = self.file_handle.meta_ref().index_file_size();
580 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
581 (
582 rg.num_rows() as usize,
583 output.contains_non_empty_row_group(i)
585 && cached
586 .as_ref()
587 .map(|c| !c.contains_row_group(i))
588 .unwrap_or(true),
589 )
590 });
591 let apply_res = index_applier
592 .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
593 .await;
594 let mut selection = match apply_res {
595 Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
596 Err(err) => {
597 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
598 return false;
599 }
600 };
601
602 if let Some(cached) = cached.as_ref() {
604 selection.concat(cached);
605 }
606
607 self.apply_index_result_and_update_cache(
608 predicate_key,
609 self.file_handle.file_id().file_id(),
610 selection,
611 output,
612 metrics,
613 INDEX_TYPE_BLOOM,
614 );
615 true
616 }
617
618 async fn prune_row_groups_by_fulltext_bloom(
619 &self,
620 row_group_size: usize,
621 parquet_meta: &ParquetMetaData,
622 output: &mut RowGroupSelection,
623 metrics: &mut ReaderFilterMetrics,
624 ) -> bool {
625 let Some(index_applier) = &self.fulltext_index_applier else {
626 return false;
627 };
628 if !self.file_handle.meta_ref().fulltext_index_available() {
629 return false;
630 }
631
632 let predicate_key = index_applier.predicate_key();
633 let cached = self
635 .cache_strategy
636 .index_result_cache()
637 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
638 if let Some(result) = cached.as_ref() {
639 if all_required_row_groups_searched(output, result) {
640 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
641 return true;
642 }
643 }
644
645 let file_size_hint = self.file_handle.meta_ref().index_file_size();
647 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
648 (
649 rg.num_rows() as usize,
650 output.contains_non_empty_row_group(i)
652 && cached
653 .as_ref()
654 .map(|c| !c.contains_row_group(i))
655 .unwrap_or(true),
656 )
657 });
658 let apply_res = index_applier
659 .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
660 .await;
661 let mut selection = match apply_res {
662 Ok(Some(apply_output)) => {
663 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
664 }
665 Ok(None) => return false,
666 Err(err) => {
667 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
668 return false;
669 }
670 };
671
672 if let Some(cached) = cached.as_ref() {
674 selection.concat(cached);
675 }
676
677 self.apply_index_result_and_update_cache(
678 predicate_key,
679 self.file_handle.file_id().file_id(),
680 selection,
681 output,
682 metrics,
683 INDEX_TYPE_FULLTEXT,
684 );
685 true
686 }
687
688 fn prune_row_groups_by_minmax(
690 &self,
691 read_format: &ReadFormat,
692 parquet_meta: &ParquetMetaData,
693 output: &mut RowGroupSelection,
694 metrics: &mut ReaderFilterMetrics,
695 ) -> bool {
696 let Some(predicate) = &self.predicate else {
697 return false;
698 };
699
700 let row_groups_before = output.row_group_count();
701
702 let region_meta = read_format.metadata();
703 let row_groups = parquet_meta.row_groups();
704 let stats =
705 RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
706 let prune_schema = self
707 .expected_metadata
708 .as_ref()
709 .map(|meta| meta.schema.arrow_schema())
710 .unwrap_or_else(|| region_meta.schema.arrow_schema());
711
712 predicate
716 .prune_with_stats(&stats, prune_schema)
717 .iter()
718 .zip(0..parquet_meta.num_row_groups())
719 .for_each(|(mask, row_group)| {
720 if !*mask {
721 output.remove_row_group(row_group);
722 }
723 });
724
725 let row_groups_after = output.row_group_count();
726 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
727
728 true
729 }
730
731 fn apply_index_result_and_update_cache(
732 &self,
733 predicate_key: &PredicateKey,
734 file_id: FileId,
735 result: RowGroupSelection,
736 output: &mut RowGroupSelection,
737 metrics: &mut ReaderFilterMetrics,
738 index_type: &str,
739 ) {
740 apply_selection_and_update_metrics(output, &result, metrics, index_type);
741
742 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
743 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
744 }
745 }
746}
747
748fn apply_selection_and_update_metrics(
749 output: &mut RowGroupSelection,
750 result: &RowGroupSelection,
751 metrics: &mut ReaderFilterMetrics,
752 index_type: &str,
753) {
754 let intersection = output.intersect(result);
755
756 let row_group_count = output.row_group_count() - intersection.row_group_count();
757 let row_count = output.row_count() - intersection.row_count();
758
759 metrics.update_index_metrics(index_type, row_group_count, row_count);
760
761 *output = intersection;
762}
763
764fn all_required_row_groups_searched(
765 required_row_groups: &RowGroupSelection,
766 cached_row_groups: &RowGroupSelection,
767) -> bool {
768 required_row_groups.iter().all(|(rg_id, _)| {
769 !required_row_groups.contains_non_empty_row_group(*rg_id)
771 || cached_row_groups.contains_row_group(*rg_id)
773 })
774}
775
776#[derive(Debug, Default, Clone, Copy)]
778pub(crate) struct ReaderFilterMetrics {
779 pub(crate) rg_total: usize,
781 pub(crate) rg_fulltext_filtered: usize,
783 pub(crate) rg_inverted_filtered: usize,
785 pub(crate) rg_minmax_filtered: usize,
787 pub(crate) rg_bloom_filtered: usize,
789
790 pub(crate) rows_total: usize,
792 pub(crate) rows_fulltext_filtered: usize,
794 pub(crate) rows_inverted_filtered: usize,
796 pub(crate) rows_bloom_filtered: usize,
798 pub(crate) rows_precise_filtered: usize,
800}
801
802impl ReaderFilterMetrics {
803 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
805 self.rg_total += other.rg_total;
806 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
807 self.rg_inverted_filtered += other.rg_inverted_filtered;
808 self.rg_minmax_filtered += other.rg_minmax_filtered;
809 self.rg_bloom_filtered += other.rg_bloom_filtered;
810
811 self.rows_total += other.rows_total;
812 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
813 self.rows_inverted_filtered += other.rows_inverted_filtered;
814 self.rows_bloom_filtered += other.rows_bloom_filtered;
815 self.rows_precise_filtered += other.rows_precise_filtered;
816 }
817
818 pub(crate) fn observe(&self) {
820 READ_ROW_GROUPS_TOTAL
821 .with_label_values(&["before_filtering"])
822 .inc_by(self.rg_total as u64);
823 READ_ROW_GROUPS_TOTAL
824 .with_label_values(&["fulltext_index_filtered"])
825 .inc_by(self.rg_fulltext_filtered as u64);
826 READ_ROW_GROUPS_TOTAL
827 .with_label_values(&["inverted_index_filtered"])
828 .inc_by(self.rg_inverted_filtered as u64);
829 READ_ROW_GROUPS_TOTAL
830 .with_label_values(&["minmax_index_filtered"])
831 .inc_by(self.rg_minmax_filtered as u64);
832 READ_ROW_GROUPS_TOTAL
833 .with_label_values(&["bloom_filter_index_filtered"])
834 .inc_by(self.rg_bloom_filtered as u64);
835
836 PRECISE_FILTER_ROWS_TOTAL
837 .with_label_values(&["parquet"])
838 .inc_by(self.rows_precise_filtered as u64);
839 READ_ROWS_IN_ROW_GROUP_TOTAL
840 .with_label_values(&["before_filtering"])
841 .inc_by(self.rows_total as u64);
842 READ_ROWS_IN_ROW_GROUP_TOTAL
843 .with_label_values(&["fulltext_index_filtered"])
844 .inc_by(self.rows_fulltext_filtered as u64);
845 READ_ROWS_IN_ROW_GROUP_TOTAL
846 .with_label_values(&["inverted_index_filtered"])
847 .inc_by(self.rows_inverted_filtered as u64);
848 READ_ROWS_IN_ROW_GROUP_TOTAL
849 .with_label_values(&["bloom_filter_index_filtered"])
850 .inc_by(self.rows_bloom_filtered as u64);
851 }
852
853 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
854 match index_type {
855 INDEX_TYPE_FULLTEXT => {
856 self.rg_fulltext_filtered += row_group_count;
857 self.rows_fulltext_filtered += row_count;
858 }
859 INDEX_TYPE_INVERTED => {
860 self.rg_inverted_filtered += row_group_count;
861 self.rows_inverted_filtered += row_count;
862 }
863 INDEX_TYPE_BLOOM => {
864 self.rg_bloom_filtered += row_group_count;
865 self.rows_bloom_filtered += row_count;
866 }
867 _ => {}
868 }
869 }
870}
871
872#[derive(Debug, Default, Clone)]
874pub struct ReaderMetrics {
875 pub(crate) filter_metrics: ReaderFilterMetrics,
877 pub(crate) build_cost: Duration,
879 pub(crate) scan_cost: Duration,
881 pub(crate) num_record_batches: usize,
883 pub(crate) num_batches: usize,
885 pub(crate) num_rows: usize,
887}
888
889impl ReaderMetrics {
890 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
892 self.filter_metrics.merge_from(&other.filter_metrics);
893 self.build_cost += other.build_cost;
894 self.scan_cost += other.scan_cost;
895 self.num_record_batches += other.num_record_batches;
896 self.num_batches += other.num_batches;
897 self.num_rows += other.num_rows;
898 }
899
900 pub(crate) fn observe_rows(&self, read_type: &str) {
902 READ_ROWS_TOTAL
903 .with_label_values(&[read_type])
904 .inc_by(self.num_rows as u64);
905 }
906}
907
908pub(crate) struct RowGroupReaderBuilder {
910 file_handle: FileHandle,
914 file_path: String,
916 parquet_meta: Arc<ParquetMetaData>,
918 object_store: ObjectStore,
920 projection: ProjectionMask,
922 field_levels: FieldLevels,
924 cache_strategy: CacheStrategy,
926}
927
928impl RowGroupReaderBuilder {
929 pub(crate) fn file_path(&self) -> &str {
931 &self.file_path
932 }
933
934 pub(crate) fn file_handle(&self) -> &FileHandle {
936 &self.file_handle
937 }
938
939 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
940 &self.parquet_meta
941 }
942
943 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
944 &self.cache_strategy
945 }
946
947 pub(crate) async fn build(
949 &self,
950 row_group_idx: usize,
951 row_selection: Option<RowSelection>,
952 ) -> Result<ParquetRecordBatchReader> {
953 let mut row_group = InMemoryRowGroup::create(
954 self.file_handle.region_id(),
955 self.file_handle.file_id().file_id(),
956 &self.parquet_meta,
957 row_group_idx,
958 self.cache_strategy.clone(),
959 &self.file_path,
960 self.object_store.clone(),
961 );
962 row_group
964 .fetch(&self.projection, row_selection.as_ref())
965 .await
966 .context(ReadParquetSnafu {
967 path: &self.file_path,
968 })?;
969
970 ParquetRecordBatchReader::try_new_with_row_groups(
973 &self.field_levels,
974 &row_group,
975 DEFAULT_READ_BATCH_SIZE,
976 row_selection,
977 )
978 .context(ReadParquetSnafu {
979 path: &self.file_path,
980 })
981 }
982}
983
984enum ReaderState {
986 Readable(PruneReader),
988 Exhausted(ReaderMetrics),
990}
991
992impl ReaderState {
993 fn metrics(&self) -> ReaderMetrics {
995 match self {
996 ReaderState::Readable(reader) => reader.metrics(),
997 ReaderState::Exhausted(m) => m.clone(),
998 }
999 }
1000}
1001
1002pub(crate) enum MaybeFilter {
1004 Filter(SimpleFilterEvaluator),
1006 Matched,
1008 Pruned,
1010}
1011
1012pub(crate) struct SimpleFilterContext {
1014 filter: MaybeFilter,
1016 column_id: ColumnId,
1018 semantic_type: SemanticType,
1020 data_type: ConcreteDataType,
1022}
1023
1024impl SimpleFilterContext {
1025 pub(crate) fn new_opt(
1030 sst_meta: &RegionMetadataRef,
1031 expected_meta: Option<&RegionMetadata>,
1032 expr: &Expr,
1033 ) -> Option<Self> {
1034 let filter = SimpleFilterEvaluator::try_new(expr)?;
1035 let (column_metadata, maybe_filter) = match expected_meta {
1036 Some(meta) => {
1037 let column = meta.column_by_name(filter.column_name())?;
1039 match sst_meta.column_by_id(column.column_id) {
1042 Some(sst_column) => {
1043 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1044
1045 (column, MaybeFilter::Filter(filter))
1046 }
1047 None => {
1048 if pruned_by_default(&filter, column)? {
1052 (column, MaybeFilter::Pruned)
1053 } else {
1054 (column, MaybeFilter::Matched)
1055 }
1056 }
1057 }
1058 }
1059 None => {
1060 let column = sst_meta.column_by_name(filter.column_name())?;
1061 (column, MaybeFilter::Filter(filter))
1062 }
1063 };
1064
1065 Some(Self {
1066 filter: maybe_filter,
1067 column_id: column_metadata.column_id,
1068 semantic_type: column_metadata.semantic_type,
1069 data_type: column_metadata.column_schema.data_type.clone(),
1070 })
1071 }
1072
1073 pub(crate) fn filter(&self) -> &MaybeFilter {
1075 &self.filter
1076 }
1077
1078 pub(crate) fn column_id(&self) -> ColumnId {
1080 self.column_id
1081 }
1082
1083 pub(crate) fn semantic_type(&self) -> SemanticType {
1085 self.semantic_type
1086 }
1087
1088 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1090 &self.data_type
1091 }
1092}
1093
1094fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1097 let value = column.column_schema.create_default().ok().flatten()?;
1098 let scalar_value = value
1099 .try_to_scalar_value(&column.column_schema.data_type)
1100 .ok()?;
1101 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1102 Some(!matches)
1103}
1104
1105pub struct ParquetReader {
1107 context: FileRangeContextRef,
1109 selection: RowGroupSelection,
1111 reader_state: ReaderState,
1113}
1114
1115#[async_trait]
1116impl BatchReader for ParquetReader {
1117 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1118 let ReaderState::Readable(reader) = &mut self.reader_state else {
1119 return Ok(None);
1120 };
1121
1122 if let Some(batch) = reader.next_batch().await? {
1124 return Ok(Some(batch));
1125 }
1126
1127 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1129 let parquet_reader = self
1130 .context
1131 .reader_builder()
1132 .build(row_group_idx, Some(row_selection))
1133 .await?;
1134
1135 reader.reset_source(Source::RowGroup(RowGroupReader::new(
1137 self.context.clone(),
1138 parquet_reader,
1139 )));
1140 if let Some(batch) = reader.next_batch().await? {
1141 return Ok(Some(batch));
1142 }
1143 }
1144
1145 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1147 Ok(None)
1148 }
1149}
1150
1151impl Drop for ParquetReader {
1152 fn drop(&mut self) {
1153 let metrics = self.reader_state.metrics();
1154 debug!(
1155 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1156 self.context.reader_builder().file_handle.region_id(),
1157 self.context.reader_builder().file_handle.file_id(),
1158 self.context.reader_builder().file_handle.time_range(),
1159 metrics.filter_metrics.rg_total
1160 - metrics.filter_metrics.rg_inverted_filtered
1161 - metrics.filter_metrics.rg_minmax_filtered
1162 - metrics.filter_metrics.rg_fulltext_filtered
1163 - metrics.filter_metrics.rg_bloom_filtered,
1164 metrics.filter_metrics.rg_total,
1165 metrics
1166 );
1167
1168 READ_STAGE_ELAPSED
1170 .with_label_values(&["build_parquet_reader"])
1171 .observe(metrics.build_cost.as_secs_f64());
1172 READ_STAGE_ELAPSED
1173 .with_label_values(&["scan_row_groups"])
1174 .observe(metrics.scan_cost.as_secs_f64());
1175 metrics.observe_rows("parquet_reader");
1176 metrics.filter_metrics.observe();
1177 }
1178}
1179
1180impl ParquetReader {
1181 pub(crate) async fn new(
1183 context: FileRangeContextRef,
1184 mut selection: RowGroupSelection,
1185 ) -> Result<Self> {
1186 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1188 let parquet_reader = context
1189 .reader_builder()
1190 .build(row_group_idx, Some(row_selection))
1191 .await?;
1192 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1193 context.clone(),
1194 RowGroupReader::new(context.clone(), parquet_reader),
1195 ))
1196 } else {
1197 ReaderState::Exhausted(ReaderMetrics::default())
1198 };
1199
1200 Ok(ParquetReader {
1201 context,
1202 selection,
1203 reader_state,
1204 })
1205 }
1206
1207 pub fn metadata(&self) -> &RegionMetadataRef {
1209 self.context.read_format().metadata()
1210 }
1211
1212 #[cfg(test)]
1213 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1214 self.context.reader_builder().parquet_meta.clone()
1215 }
1216}
1217
1218pub(crate) trait RowGroupReaderContext: Send {
1221 fn map_result(
1222 &self,
1223 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1224 ) -> Result<Option<RecordBatch>>;
1225
1226 fn read_format(&self) -> &ReadFormat;
1227}
1228
1229impl RowGroupReaderContext for FileRangeContextRef {
1230 fn map_result(
1231 &self,
1232 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1233 ) -> Result<Option<RecordBatch>> {
1234 result.context(ArrowReaderSnafu {
1235 path: self.file_path(),
1236 })
1237 }
1238
1239 fn read_format(&self) -> &ReadFormat {
1240 self.as_ref().read_format()
1241 }
1242}
1243
1244pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1246
1247impl RowGroupReader {
1248 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1250 Self::create(context, reader)
1251 }
1252}
1253
1254pub(crate) struct RowGroupReaderBase<T> {
1256 context: T,
1258 reader: ParquetRecordBatchReader,
1260 batches: VecDeque<Batch>,
1262 metrics: ReaderMetrics,
1264 override_sequence: Option<ArrayRef>,
1266}
1267
1268impl<T> RowGroupReaderBase<T>
1269where
1270 T: RowGroupReaderContext,
1271{
1272 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1274 let override_sequence = context
1276 .read_format()
1277 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1278 assert!(context.read_format().as_primary_key().is_some());
1279
1280 Self {
1281 context,
1282 reader,
1283 batches: VecDeque::new(),
1284 metrics: ReaderMetrics::default(),
1285 override_sequence,
1286 }
1287 }
1288
1289 pub(crate) fn metrics(&self) -> &ReaderMetrics {
1291 &self.metrics
1292 }
1293
1294 pub(crate) fn read_format(&self) -> &ReadFormat {
1296 self.context.read_format()
1297 }
1298
1299 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1301 self.context.map_result(self.reader.next().transpose())
1302 }
1303
1304 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1306 let scan_start = Instant::now();
1307 if let Some(batch) = self.batches.pop_front() {
1308 self.metrics.num_rows += batch.num_rows();
1309 self.metrics.scan_cost += scan_start.elapsed();
1310 return Ok(Some(batch));
1311 }
1312
1313 while self.batches.is_empty() {
1315 let Some(record_batch) = self.fetch_next_record_batch()? else {
1316 self.metrics.scan_cost += scan_start.elapsed();
1317 return Ok(None);
1318 };
1319 self.metrics.num_record_batches += 1;
1320
1321 self.context
1323 .read_format()
1324 .as_primary_key()
1325 .unwrap()
1326 .convert_record_batch(
1327 &record_batch,
1328 self.override_sequence.as_ref(),
1329 &mut self.batches,
1330 )?;
1331 self.metrics.num_batches += self.batches.len();
1332 }
1333 let batch = self.batches.pop_front();
1334 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1335 self.metrics.scan_cost += scan_start.elapsed();
1336 Ok(batch)
1337 }
1338}
1339
1340#[async_trait::async_trait]
1341impl<T> BatchReader for RowGroupReaderBase<T>
1342where
1343 T: RowGroupReaderContext,
1344{
1345 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1346 self.next_inner()
1347 }
1348}
1349
1350pub(crate) struct FlatRowGroupReader {
1352 context: FileRangeContextRef,
1354 reader: ParquetRecordBatchReader,
1356 override_sequence: Option<ArrayRef>,
1358}
1359
1360impl FlatRowGroupReader {
1361 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1363 let override_sequence = context
1365 .read_format()
1366 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1367
1368 Self {
1369 context,
1370 reader,
1371 override_sequence,
1372 }
1373 }
1374
1375 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1377 match self.reader.next() {
1378 Some(batch_result) => {
1379 let record_batch = batch_result.context(ArrowReaderSnafu {
1380 path: self.context.file_path(),
1381 })?;
1382
1383 if let (Some(flat_format), Some(override_array)) = (
1385 self.context.read_format().as_flat(),
1386 &self.override_sequence,
1387 ) {
1388 let converted =
1389 flat_format.convert_batch(record_batch, Some(override_array))?;
1390 return Ok(Some(converted));
1391 }
1392
1393 Ok(Some(record_batch))
1394 }
1395 None => Ok(None),
1396 }
1397 }
1398}