1#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use async_trait::async_trait;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_telemetry::{debug, tracing, warn};
27use datafusion_expr::Expr;
28use datatypes::arrow::array::ArrayRef;
29use datatypes::arrow::datatypes::Field;
30use datatypes::arrow::error::ArrowError;
31use datatypes::arrow::record_batch::RecordBatch;
32use datatypes::data_type::ConcreteDataType;
33use datatypes::prelude::DataType;
34use mito_codec::row_converter::build_primary_key_codec;
35use object_store::ObjectStore;
36use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
37use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
38use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
39use partition::expr::PartitionExpr;
40use snafu::{OptionExt, ResultExt};
41use store_api::codec::PrimaryKeyEncoding;
42use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
43use store_api::region_request::PathType;
44use store_api::storage::{ColumnId, FileId};
45use table::predicate::Predicate;
46
47use crate::cache::CacheStrategy;
48use crate::cache::index::result_cache::PredicateKey;
49#[cfg(feature = "vector_index")]
50use crate::error::ApplyVectorIndexSnafu;
51use crate::error::{
52 ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
53 ReadParquetSnafu, Result, SerializePartitionExprSnafu,
54};
55use crate::metrics::{
56 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
57 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
58};
59use crate::read::flat_projection::CompactionProjectionMapper;
60use crate::read::prune::{PruneReader, Source};
61use crate::read::{Batch, BatchReader};
62use crate::sst::file::FileHandle;
63use crate::sst::index::bloom_filter::applier::{
64 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
65};
66use crate::sst::index::fulltext_index::applier::{
67 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
68};
69use crate::sst::index::inverted_index::applier::{
70 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
71};
72#[cfg(feature = "vector_index")]
73use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
74use crate::sst::parquet::file_range::{
75 FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
76 row_group_contains_delete,
77};
78use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
79use crate::sst::parquet::metadata::MetadataLoader;
80use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
81use crate::sst::parquet::row_selection::RowGroupSelection;
82use crate::sst::parquet::stats::RowGroupPruningStats;
83use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
84use crate::sst::tag_maybe_to_dictionary_field;
85
86const INDEX_TYPE_FULLTEXT: &str = "fulltext";
87const INDEX_TYPE_INVERTED: &str = "inverted";
88const INDEX_TYPE_BLOOM: &str = "bloom filter";
89const INDEX_TYPE_VECTOR: &str = "vector";
90
91macro_rules! handle_index_error {
92 ($err:expr, $file_handle:expr, $index_type:expr) => {
93 if cfg!(any(test, feature = "test")) {
94 panic!(
95 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
96 $index_type,
97 $file_handle.region_id(),
98 $file_handle.file_id(),
99 $err
100 );
101 } else {
102 warn!(
103 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
104 $index_type,
105 $file_handle.region_id(),
106 $file_handle.file_id()
107 );
108 }
109 };
110}
111
112pub struct ParquetReaderBuilder {
114 table_dir: String,
116 path_type: PathType,
118 file_handle: FileHandle,
119 object_store: ObjectStore,
120 predicate: Option<Predicate>,
122 projection: Option<Vec<ColumnId>>,
127 cache_strategy: CacheStrategy,
129 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
131 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
132 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
133 #[cfg(feature = "vector_index")]
135 vector_index_applier: Option<VectorIndexApplierRef>,
136 #[cfg(feature = "vector_index")]
138 vector_index_k: Option<usize>,
139 expected_metadata: Option<RegionMetadataRef>,
143 flat_format: bool,
145 compaction: bool,
147 pre_filter_mode: PreFilterMode,
149 decode_primary_key_values: bool,
151 page_index_policy: PageIndexPolicy,
152}
153
154impl ParquetReaderBuilder {
155 pub fn new(
157 table_dir: String,
158 path_type: PathType,
159 file_handle: FileHandle,
160 object_store: ObjectStore,
161 ) -> ParquetReaderBuilder {
162 ParquetReaderBuilder {
163 table_dir,
164 path_type,
165 file_handle,
166 object_store,
167 predicate: None,
168 projection: None,
169 cache_strategy: CacheStrategy::Disabled,
170 inverted_index_appliers: [None, None],
171 bloom_filter_index_appliers: [None, None],
172 fulltext_index_appliers: [None, None],
173 #[cfg(feature = "vector_index")]
174 vector_index_applier: None,
175 #[cfg(feature = "vector_index")]
176 vector_index_k: None,
177 expected_metadata: None,
178 flat_format: false,
179 compaction: false,
180 pre_filter_mode: PreFilterMode::All,
181 decode_primary_key_values: false,
182 page_index_policy: Default::default(),
183 }
184 }
185
186 #[must_use]
188 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
189 self.predicate = predicate;
190 self
191 }
192
193 #[must_use]
197 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
198 self.projection = projection;
199 self
200 }
201
202 #[must_use]
204 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
205 self.cache_strategy = cache;
206 self
207 }
208
209 #[must_use]
211 pub(crate) fn inverted_index_appliers(
212 mut self,
213 index_appliers: [Option<InvertedIndexApplierRef>; 2],
214 ) -> Self {
215 self.inverted_index_appliers = index_appliers;
216 self
217 }
218
219 #[must_use]
221 pub(crate) fn bloom_filter_index_appliers(
222 mut self,
223 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
224 ) -> Self {
225 self.bloom_filter_index_appliers = index_appliers;
226 self
227 }
228
229 #[must_use]
231 pub(crate) fn fulltext_index_appliers(
232 mut self,
233 index_appliers: [Option<FulltextIndexApplierRef>; 2],
234 ) -> Self {
235 self.fulltext_index_appliers = index_appliers;
236 self
237 }
238
239 #[cfg(feature = "vector_index")]
241 #[must_use]
242 pub(crate) fn vector_index_applier(
243 mut self,
244 applier: Option<VectorIndexApplierRef>,
245 k: Option<usize>,
246 ) -> Self {
247 self.vector_index_applier = applier;
248 self.vector_index_k = k;
249 self
250 }
251
252 #[must_use]
254 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
255 self.expected_metadata = expected_metadata;
256 self
257 }
258
259 #[must_use]
261 pub fn flat_format(mut self, flat_format: bool) -> Self {
262 self.flat_format = flat_format;
263 self
264 }
265
266 #[must_use]
268 pub fn compaction(mut self, compaction: bool) -> Self {
269 self.compaction = compaction;
270 self
271 }
272
273 #[must_use]
275 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
276 self.pre_filter_mode = pre_filter_mode;
277 self
278 }
279
280 #[must_use]
282 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
283 self.decode_primary_key_values = decode;
284 self
285 }
286
287 #[must_use]
288 pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
289 self.page_index_policy = page_index_policy;
290 self
291 }
292
293 #[tracing::instrument(
297 skip_all,
298 fields(
299 region_id = %self.file_handle.region_id(),
300 file_id = %self.file_handle.file_id()
301 )
302 )]
303 pub async fn build(&self) -> Result<ParquetReader> {
304 let mut metrics = ReaderMetrics::default();
305
306 let (context, selection) = self.build_reader_input(&mut metrics).await?;
307 ParquetReader::new(Arc::new(context), selection).await
308 }
309
310 #[tracing::instrument(
314 skip_all,
315 fields(
316 region_id = %self.file_handle.region_id(),
317 file_id = %self.file_handle.file_id()
318 )
319 )]
320 pub(crate) async fn build_reader_input(
321 &self,
322 metrics: &mut ReaderMetrics,
323 ) -> Result<(FileRangeContext, RowGroupSelection)> {
324 let start = Instant::now();
325
326 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
327 let file_size = self.file_handle.meta_ref().file_size;
328
329 let (parquet_meta, cache_miss) = self
331 .read_parquet_metadata(
332 &file_path,
333 file_size,
334 &mut metrics.metadata_cache_metrics,
335 self.page_index_policy,
336 )
337 .await?;
338 let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
340 let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
342 let region_partition_expr = self
343 .expected_metadata
344 .as_ref()
345 .and_then(|meta| meta.partition_expr.as_ref());
346 let (_, is_same_region_partition) = Self::is_same_region_partition(
347 region_partition_expr.as_ref().map(|expr| expr.as_str()),
348 self.file_handle.meta_ref().partition_expr.as_ref(),
349 )?;
350 let skip_auto_convert = self.compaction && is_same_region_partition;
354
355 let compaction_projection_mapper = if self.compaction
364 && !is_same_region_partition
365 && self.flat_format
366 && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
367 {
368 Some(CompactionProjectionMapper::try_new(®ion_meta)?)
369 } else {
370 None
371 };
372
373 let mut read_format = if let Some(column_ids) = &self.projection {
374 ReadFormat::new(
375 region_meta.clone(),
376 Some(column_ids),
377 self.flat_format,
378 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
379 &file_path,
380 skip_auto_convert,
381 )?
382 } else {
383 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
385 let column_ids: Vec<_> = expected_meta
386 .column_metadatas
387 .iter()
388 .map(|col| col.column_id)
389 .collect();
390 ReadFormat::new(
391 region_meta.clone(),
392 Some(&column_ids),
393 self.flat_format,
394 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395 &file_path,
396 skip_auto_convert,
397 )?
398 };
399 if self.decode_primary_key_values {
400 read_format.set_decode_primary_key_values(true);
401 }
402 if need_override_sequence(&parquet_meta) {
403 read_format
404 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
405 }
406
407 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
409 let indices = read_format.projection_indices();
410 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
413
414 let hint = Some(read_format.arrow_schema().fields());
416 let field_levels =
417 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
418 .context(ReadDataPartSnafu)?;
419 let selection = self
420 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
421 .await;
422
423 if cache_miss && !selection.is_empty() {
425 use crate::cache::file_cache::{FileType, IndexKey};
426 let index_key = IndexKey::new(
427 self.file_handle.region_id(),
428 self.file_handle.file_id().file_id(),
429 FileType::Parquet,
430 );
431 self.cache_strategy.maybe_download_background(
432 index_key,
433 file_path.clone(),
434 self.object_store.clone(),
435 file_size,
436 );
437 }
438
439 let prune_schema = self
440 .expected_metadata
441 .as_ref()
442 .map(|meta| meta.schema.clone())
443 .unwrap_or_else(|| region_meta.schema.clone());
444
445 let reader_builder = RowGroupReaderBuilder {
446 file_handle: self.file_handle.clone(),
447 file_path,
448 parquet_meta,
449 object_store: self.object_store.clone(),
450 projection: projection_mask,
451 field_levels,
452 cache_strategy: self.cache_strategy.clone(),
453 };
454
455 let filters = if let Some(predicate) = &self.predicate {
456 predicate
457 .exprs()
458 .iter()
459 .filter_map(|expr| {
460 SimpleFilterContext::new_opt(
461 ®ion_meta,
462 self.expected_metadata.as_deref(),
463 expr,
464 )
465 })
466 .collect::<Vec<_>>()
467 } else {
468 vec![]
469 };
470
471 let dyn_filters = if let Some(predicate) = &self.predicate {
472 predicate.dyn_filters().clone()
473 } else {
474 Arc::new(vec![])
475 };
476
477 let codec = build_primary_key_codec(read_format.metadata());
478
479 let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
480
481 let context = FileRangeContext::new(
482 reader_builder,
483 RangeBase {
484 filters,
485 dyn_filters,
486 read_format,
487 expected_metadata: self.expected_metadata.clone(),
488 prune_schema,
489 codec,
490 compat_batch: None,
491 compaction_projection_mapper,
492 pre_filter_mode: self.pre_filter_mode,
493 partition_filter,
494 },
495 );
496
497 metrics.build_cost += start.elapsed();
498
499 Ok((context, selection))
500 }
501
502 fn is_same_region_partition(
503 region_partition_expr_str: Option<&str>,
504 file_partition_expr: Option<&PartitionExpr>,
505 ) -> Result<(Option<PartitionExpr>, bool)> {
506 let region_partition_expr = match region_partition_expr_str {
507 Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
508 None => None,
509 };
510
511 let is_same = region_partition_expr.as_ref() == file_partition_expr;
512 Ok((region_partition_expr, is_same))
513 }
514
515 fn build_partition_filter(
518 &self,
519 read_format: &ReadFormat,
520 prune_schema: &Arc<datatypes::schema::Schema>,
521 ) -> Result<Option<PartitionFilterContext>> {
522 let region_partition_expr_str = self
523 .expected_metadata
524 .as_ref()
525 .and_then(|meta| meta.partition_expr.as_ref());
526 let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
527
528 let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
529 region_partition_expr_str.map(|s| s.as_str()),
530 file_partition_expr_ref,
531 )?;
532
533 if is_same_region_partition {
534 return Ok(None);
535 }
536
537 let Some(region_partition_expr) = region_partition_expr else {
538 return Ok(None);
539 };
540
541 let mut referenced_columns = HashSet::new();
543 region_partition_expr.collect_column_names(&mut referenced_columns);
544
545 let is_flat = read_format.as_flat().is_some();
547 let partition_schema = Arc::new(datatypes::schema::Schema::new(
548 prune_schema
549 .column_schemas()
550 .iter()
551 .filter(|col| referenced_columns.contains(&col.name))
552 .map(|col| {
553 if is_flat
554 && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
555 && column_meta.semantic_type == SemanticType::Tag
556 && col.data_type.is_string()
557 {
558 let field = Arc::new(Field::new(
559 &col.name,
560 col.data_type.as_arrow_type(),
561 col.is_nullable(),
562 ));
563 let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
564 let mut column = col.clone();
565 column.data_type =
566 ConcreteDataType::from_arrow_type(dict_field.data_type());
567 return column;
568 }
569
570 col.clone()
571 })
572 .collect::<Vec<_>>(),
573 ));
574
575 let region_partition_physical_expr = region_partition_expr
576 .try_as_physical_expr(partition_schema.arrow_schema())
577 .context(SerializePartitionExprSnafu)?;
578
579 Ok(Some(PartitionFilterContext {
580 region_partition_physical_expr,
581 partition_schema,
582 }))
583 }
584
585 fn get_region_metadata(
587 file_path: &str,
588 key_value_meta: Option<&Vec<KeyValue>>,
589 ) -> Result<RegionMetadata> {
590 let key_values = key_value_meta.context(InvalidParquetSnafu {
591 file: file_path,
592 reason: "missing key value meta",
593 })?;
594 let meta_value = key_values
595 .iter()
596 .find(|kv| kv.key == PARQUET_METADATA_KEY)
597 .with_context(|| InvalidParquetSnafu {
598 file: file_path,
599 reason: format!("key {} not found", PARQUET_METADATA_KEY),
600 })?;
601 let json = meta_value
602 .value
603 .as_ref()
604 .with_context(|| InvalidParquetSnafu {
605 file: file_path,
606 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
607 })?;
608
609 RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
610 }
611
612 async fn read_parquet_metadata(
615 &self,
616 file_path: &str,
617 file_size: u64,
618 cache_metrics: &mut MetadataCacheMetrics,
619 page_index_policy: PageIndexPolicy,
620 ) -> Result<(Arc<ParquetMetaData>, bool)> {
621 let start = Instant::now();
622 let _t = READ_STAGE_ELAPSED
623 .with_label_values(&["read_parquet_metadata"])
624 .start_timer();
625
626 let file_id = self.file_handle.file_id();
627 if let Some(metadata) = self
629 .cache_strategy
630 .get_parquet_meta_data(file_id, cache_metrics, page_index_policy)
631 .await
632 {
633 cache_metrics.metadata_load_cost += start.elapsed();
634 return Ok((metadata, false));
635 }
636
637 let mut metadata_loader =
639 MetadataLoader::new(self.object_store.clone(), file_path, file_size);
640 metadata_loader.with_page_index_policy(page_index_policy);
641 let metadata = metadata_loader.load(cache_metrics).await?;
642
643 let metadata = Arc::new(metadata);
644 self.cache_strategy
646 .put_parquet_meta_data(file_id, metadata.clone());
647
648 cache_metrics.metadata_load_cost += start.elapsed();
649 Ok((metadata, true))
650 }
651
652 #[tracing::instrument(
654 skip_all,
655 fields(
656 region_id = %self.file_handle.region_id(),
657 file_id = %self.file_handle.file_id()
658 )
659 )]
660 async fn row_groups_to_read(
661 &self,
662 read_format: &ReadFormat,
663 parquet_meta: &ParquetMetaData,
664 metrics: &mut ReaderFilterMetrics,
665 ) -> RowGroupSelection {
666 let num_row_groups = parquet_meta.num_row_groups();
667 let num_rows = parquet_meta.file_metadata().num_rows();
668 if num_row_groups == 0 || num_rows == 0 {
669 return RowGroupSelection::default();
670 }
671
672 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
675 if row_group_size == 0 {
676 return RowGroupSelection::default();
677 }
678
679 metrics.rg_total += num_row_groups;
680 metrics.rows_total += num_rows as usize;
681
682 let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
683
684 let skip_fields = self.compute_skip_fields(parquet_meta);
686
687 self.prune_row_groups_by_minmax(
688 read_format,
689 parquet_meta,
690 &mut output,
691 metrics,
692 skip_fields,
693 );
694 if output.is_empty() {
695 return output;
696 }
697
698 let fulltext_filtered = self
699 .prune_row_groups_by_fulltext_index(
700 row_group_size,
701 num_row_groups,
702 &mut output,
703 metrics,
704 skip_fields,
705 )
706 .await;
707 if output.is_empty() {
708 return output;
709 }
710
711 self.prune_row_groups_by_inverted_index(
712 row_group_size,
713 num_row_groups,
714 &mut output,
715 metrics,
716 skip_fields,
717 )
718 .await;
719 if output.is_empty() {
720 return output;
721 }
722
723 self.prune_row_groups_by_bloom_filter(
724 row_group_size,
725 parquet_meta,
726 &mut output,
727 metrics,
728 skip_fields,
729 )
730 .await;
731 if output.is_empty() {
732 return output;
733 }
734
735 if !fulltext_filtered {
736 self.prune_row_groups_by_fulltext_bloom(
737 row_group_size,
738 parquet_meta,
739 &mut output,
740 metrics,
741 skip_fields,
742 )
743 .await;
744 }
745 #[cfg(feature = "vector_index")]
746 {
747 self.prune_row_groups_by_vector_index(
748 row_group_size,
749 num_row_groups,
750 &mut output,
751 metrics,
752 )
753 .await;
754 if output.is_empty() {
755 return output;
756 }
757 }
758 output
759 }
760
761 async fn prune_row_groups_by_fulltext_index(
763 &self,
764 row_group_size: usize,
765 num_row_groups: usize,
766 output: &mut RowGroupSelection,
767 metrics: &mut ReaderFilterMetrics,
768 skip_fields: bool,
769 ) -> bool {
770 if !self.file_handle.meta_ref().fulltext_index_available() {
771 return false;
772 }
773
774 let mut pruned = false;
775 let appliers = if skip_fields {
777 &self.fulltext_index_appliers[..1]
778 } else {
779 &self.fulltext_index_appliers[..]
780 };
781 for index_applier in appliers.iter().flatten() {
782 let predicate_key = index_applier.predicate_key();
783 let cached = self
785 .cache_strategy
786 .index_result_cache()
787 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
788 if let Some(result) = cached.as_ref()
789 && all_required_row_groups_searched(output, result)
790 {
791 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
792 metrics.fulltext_index_cache_hit += 1;
793 pruned = true;
794 continue;
795 }
796
797 metrics.fulltext_index_cache_miss += 1;
799 let file_size_hint = self.file_handle.meta_ref().index_file_size();
800 let apply_res = index_applier
801 .apply_fine(
802 self.file_handle.index_id(),
803 Some(file_size_hint),
804 metrics.fulltext_index_apply_metrics.as_mut(),
805 )
806 .await;
807 let selection = match apply_res {
808 Ok(Some(res)) => {
809 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
810 }
811 Ok(None) => continue,
812 Err(err) => {
813 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
814 continue;
815 }
816 };
817
818 self.apply_index_result_and_update_cache(
819 predicate_key,
820 self.file_handle.file_id().file_id(),
821 selection,
822 output,
823 metrics,
824 INDEX_TYPE_FULLTEXT,
825 );
826 pruned = true;
827 }
828 pruned
829 }
830
831 async fn prune_row_groups_by_inverted_index(
837 &self,
838 row_group_size: usize,
839 num_row_groups: usize,
840 output: &mut RowGroupSelection,
841 metrics: &mut ReaderFilterMetrics,
842 skip_fields: bool,
843 ) -> bool {
844 if !self.file_handle.meta_ref().inverted_index_available() {
845 return false;
846 }
847
848 let mut pruned = false;
849 let appliers = if skip_fields {
851 &self.inverted_index_appliers[..1]
852 } else {
853 &self.inverted_index_appliers[..]
854 };
855 for index_applier in appliers.iter().flatten() {
856 let predicate_key = index_applier.predicate_key();
857 let cached = self
859 .cache_strategy
860 .index_result_cache()
861 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
862 if let Some(result) = cached.as_ref()
863 && all_required_row_groups_searched(output, result)
864 {
865 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
866 metrics.inverted_index_cache_hit += 1;
867 pruned = true;
868 continue;
869 }
870
871 metrics.inverted_index_cache_miss += 1;
873 let file_size_hint = self.file_handle.meta_ref().index_file_size();
874 let apply_res = index_applier
875 .apply(
876 self.file_handle.index_id(),
877 Some(file_size_hint),
878 metrics.inverted_index_apply_metrics.as_mut(),
879 )
880 .await;
881 let selection = match apply_res {
882 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
883 row_group_size,
884 num_row_groups,
885 apply_output,
886 ),
887 Err(err) => {
888 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
889 continue;
890 }
891 };
892
893 self.apply_index_result_and_update_cache(
894 predicate_key,
895 self.file_handle.file_id().file_id(),
896 selection,
897 output,
898 metrics,
899 INDEX_TYPE_INVERTED,
900 );
901 pruned = true;
902 }
903 pruned
904 }
905
906 async fn prune_row_groups_by_bloom_filter(
907 &self,
908 row_group_size: usize,
909 parquet_meta: &ParquetMetaData,
910 output: &mut RowGroupSelection,
911 metrics: &mut ReaderFilterMetrics,
912 skip_fields: bool,
913 ) -> bool {
914 if !self.file_handle.meta_ref().bloom_filter_index_available() {
915 return false;
916 }
917
918 let mut pruned = false;
919 let appliers = if skip_fields {
921 &self.bloom_filter_index_appliers[..1]
922 } else {
923 &self.bloom_filter_index_appliers[..]
924 };
925 for index_applier in appliers.iter().flatten() {
926 let predicate_key = index_applier.predicate_key();
927 let cached = self
929 .cache_strategy
930 .index_result_cache()
931 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
932 if let Some(result) = cached.as_ref()
933 && all_required_row_groups_searched(output, result)
934 {
935 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
936 metrics.bloom_filter_cache_hit += 1;
937 pruned = true;
938 continue;
939 }
940
941 metrics.bloom_filter_cache_miss += 1;
943 let file_size_hint = self.file_handle.meta_ref().index_file_size();
944 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
945 (
946 rg.num_rows() as usize,
947 output.contains_non_empty_row_group(i)
949 && cached
950 .as_ref()
951 .map(|c| !c.contains_row_group(i))
952 .unwrap_or(true),
953 )
954 });
955 let apply_res = index_applier
956 .apply(
957 self.file_handle.index_id(),
958 Some(file_size_hint),
959 rgs,
960 metrics.bloom_filter_apply_metrics.as_mut(),
961 )
962 .await;
963 let mut selection = match apply_res {
964 Ok(apply_output) => {
965 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
966 }
967 Err(err) => {
968 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
969 continue;
970 }
971 };
972
973 if let Some(cached) = cached.as_ref() {
975 selection.concat(cached);
976 }
977
978 self.apply_index_result_and_update_cache(
979 predicate_key,
980 self.file_handle.file_id().file_id(),
981 selection,
982 output,
983 metrics,
984 INDEX_TYPE_BLOOM,
985 );
986 pruned = true;
987 }
988 pruned
989 }
990
991 #[cfg(feature = "vector_index")]
993 async fn prune_row_groups_by_vector_index(
994 &self,
995 row_group_size: usize,
996 num_row_groups: usize,
997 output: &mut RowGroupSelection,
998 metrics: &mut ReaderFilterMetrics,
999 ) {
1000 let Some(applier) = &self.vector_index_applier else {
1001 return;
1002 };
1003 let Some(k) = self.vector_index_k else {
1004 return;
1005 };
1006 if !self.file_handle.meta_ref().vector_index_available() {
1007 return;
1008 }
1009
1010 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1011 let apply_res = applier
1012 .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1013 .await;
1014 let row_ids = match apply_res {
1015 Ok(res) => res.row_offsets,
1016 Err(err) => {
1017 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1018 return;
1019 }
1020 };
1021
1022 let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1023 {
1024 Ok(selection) => selection,
1025 Err(err) => {
1026 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1027 return;
1028 }
1029 };
1030 metrics.rows_vector_selected += selection.row_count();
1031 apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1032 }
1033
1034 async fn prune_row_groups_by_fulltext_bloom(
1035 &self,
1036 row_group_size: usize,
1037 parquet_meta: &ParquetMetaData,
1038 output: &mut RowGroupSelection,
1039 metrics: &mut ReaderFilterMetrics,
1040 skip_fields: bool,
1041 ) -> bool {
1042 if !self.file_handle.meta_ref().fulltext_index_available() {
1043 return false;
1044 }
1045
1046 let mut pruned = false;
1047 let appliers = if skip_fields {
1049 &self.fulltext_index_appliers[..1]
1050 } else {
1051 &self.fulltext_index_appliers[..]
1052 };
1053 for index_applier in appliers.iter().flatten() {
1054 let predicate_key = index_applier.predicate_key();
1055 let cached = self
1057 .cache_strategy
1058 .index_result_cache()
1059 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1060 if let Some(result) = cached.as_ref()
1061 && all_required_row_groups_searched(output, result)
1062 {
1063 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1064 metrics.fulltext_index_cache_hit += 1;
1065 pruned = true;
1066 continue;
1067 }
1068
1069 metrics.fulltext_index_cache_miss += 1;
1071 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1072 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1073 (
1074 rg.num_rows() as usize,
1075 output.contains_non_empty_row_group(i)
1077 && cached
1078 .as_ref()
1079 .map(|c| !c.contains_row_group(i))
1080 .unwrap_or(true),
1081 )
1082 });
1083 let apply_res = index_applier
1084 .apply_coarse(
1085 self.file_handle.index_id(),
1086 Some(file_size_hint),
1087 rgs,
1088 metrics.fulltext_index_apply_metrics.as_mut(),
1089 )
1090 .await;
1091 let mut selection = match apply_res {
1092 Ok(Some(apply_output)) => {
1093 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1094 }
1095 Ok(None) => continue,
1096 Err(err) => {
1097 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1098 continue;
1099 }
1100 };
1101
1102 if let Some(cached) = cached.as_ref() {
1104 selection.concat(cached);
1105 }
1106
1107 self.apply_index_result_and_update_cache(
1108 predicate_key,
1109 self.file_handle.file_id().file_id(),
1110 selection,
1111 output,
1112 metrics,
1113 INDEX_TYPE_FULLTEXT,
1114 );
1115 pruned = true;
1116 }
1117 pruned
1118 }
1119
1120 fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
1122 match self.pre_filter_mode {
1123 PreFilterMode::All => false,
1124 PreFilterMode::SkipFields => true,
1125 PreFilterMode::SkipFieldsOnDelete => {
1126 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
1128 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1129 row_group_contains_delete(parquet_meta, rg_idx, &file_path)
1130 .inspect_err(|e| {
1131 warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
1132 })
1133 .unwrap_or(false)
1134 })
1135 }
1136 }
1137 }
1138
1139 fn prune_row_groups_by_minmax(
1141 &self,
1142 read_format: &ReadFormat,
1143 parquet_meta: &ParquetMetaData,
1144 output: &mut RowGroupSelection,
1145 metrics: &mut ReaderFilterMetrics,
1146 skip_fields: bool,
1147 ) -> bool {
1148 let Some(predicate) = &self.predicate else {
1149 return false;
1150 };
1151
1152 let row_groups_before = output.row_group_count();
1153
1154 let region_meta = read_format.metadata();
1155 let row_groups = parquet_meta.row_groups();
1156 let stats = RowGroupPruningStats::new(
1157 row_groups,
1158 read_format,
1159 self.expected_metadata.clone(),
1160 skip_fields,
1161 );
1162 let prune_schema = self
1163 .expected_metadata
1164 .as_ref()
1165 .map(|meta| meta.schema.arrow_schema())
1166 .unwrap_or_else(|| region_meta.schema.arrow_schema());
1167
1168 predicate
1172 .prune_with_stats(&stats, prune_schema)
1173 .iter()
1174 .zip(0..parquet_meta.num_row_groups())
1175 .for_each(|(mask, row_group)| {
1176 if !*mask {
1177 output.remove_row_group(row_group);
1178 }
1179 });
1180
1181 let row_groups_after = output.row_group_count();
1182 metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
1183
1184 true
1185 }
1186
1187 fn apply_index_result_and_update_cache(
1188 &self,
1189 predicate_key: &PredicateKey,
1190 file_id: FileId,
1191 result: RowGroupSelection,
1192 output: &mut RowGroupSelection,
1193 metrics: &mut ReaderFilterMetrics,
1194 index_type: &str,
1195 ) {
1196 apply_selection_and_update_metrics(output, &result, metrics, index_type);
1197
1198 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1199 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1200 }
1201 }
1202}
1203
1204fn apply_selection_and_update_metrics(
1205 output: &mut RowGroupSelection,
1206 result: &RowGroupSelection,
1207 metrics: &mut ReaderFilterMetrics,
1208 index_type: &str,
1209) {
1210 let intersection = output.intersect(result);
1211
1212 let row_group_count = output.row_group_count() - intersection.row_group_count();
1213 let row_count = output.row_count() - intersection.row_count();
1214
1215 metrics.update_index_metrics(index_type, row_group_count, row_count);
1216
1217 *output = intersection;
1218}
1219
1220#[cfg(feature = "vector_index")]
1221fn vector_selection_from_offsets(
1222 row_offsets: Vec<u64>,
1223 row_group_size: usize,
1224 num_row_groups: usize,
1225) -> Result<RowGroupSelection> {
1226 let mut row_ids = BTreeSet::new();
1227 for offset in row_offsets {
1228 let row_id = u32::try_from(offset).map_err(|_| {
1229 ApplyVectorIndexSnafu {
1230 reason: format!("Row offset {} exceeds u32::MAX", offset),
1231 }
1232 .build()
1233 })?;
1234 row_ids.insert(row_id);
1235 }
1236 Ok(RowGroupSelection::from_row_ids(
1237 row_ids,
1238 row_group_size,
1239 num_row_groups,
1240 ))
1241}
1242
1243fn all_required_row_groups_searched(
1244 required_row_groups: &RowGroupSelection,
1245 cached_row_groups: &RowGroupSelection,
1246) -> bool {
1247 required_row_groups.iter().all(|(rg_id, _)| {
1248 !required_row_groups.contains_non_empty_row_group(*rg_id)
1250 || cached_row_groups.contains_row_group(*rg_id)
1252 })
1253}
1254
1255#[derive(Debug, Default, Clone)]
1257pub(crate) struct ReaderFilterMetrics {
1258 pub(crate) rg_total: usize,
1260 pub(crate) rg_fulltext_filtered: usize,
1262 pub(crate) rg_inverted_filtered: usize,
1264 pub(crate) rg_minmax_filtered: usize,
1266 pub(crate) rg_bloom_filtered: usize,
1268 pub(crate) rg_vector_filtered: usize,
1270
1271 pub(crate) rows_total: usize,
1273 pub(crate) rows_fulltext_filtered: usize,
1275 pub(crate) rows_inverted_filtered: usize,
1277 pub(crate) rows_bloom_filtered: usize,
1279 pub(crate) rows_vector_filtered: usize,
1281 pub(crate) rows_vector_selected: usize,
1283 pub(crate) rows_precise_filtered: usize,
1285
1286 pub(crate) fulltext_index_cache_hit: usize,
1288 pub(crate) fulltext_index_cache_miss: usize,
1290 pub(crate) inverted_index_cache_hit: usize,
1292 pub(crate) inverted_index_cache_miss: usize,
1294 pub(crate) bloom_filter_cache_hit: usize,
1296 pub(crate) bloom_filter_cache_miss: usize,
1298
1299 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1301 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1303 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1305}
1306
1307impl ReaderFilterMetrics {
1308 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1310 self.rg_total += other.rg_total;
1311 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1312 self.rg_inverted_filtered += other.rg_inverted_filtered;
1313 self.rg_minmax_filtered += other.rg_minmax_filtered;
1314 self.rg_bloom_filtered += other.rg_bloom_filtered;
1315 self.rg_vector_filtered += other.rg_vector_filtered;
1316
1317 self.rows_total += other.rows_total;
1318 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1319 self.rows_inverted_filtered += other.rows_inverted_filtered;
1320 self.rows_bloom_filtered += other.rows_bloom_filtered;
1321 self.rows_vector_filtered += other.rows_vector_filtered;
1322 self.rows_vector_selected += other.rows_vector_selected;
1323 self.rows_precise_filtered += other.rows_precise_filtered;
1324
1325 self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1326 self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1327 self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1328 self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1329 self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1330 self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1331
1332 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1334 self.inverted_index_apply_metrics
1335 .get_or_insert_with(Default::default)
1336 .merge_from(other_metrics);
1337 }
1338 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1339 self.bloom_filter_apply_metrics
1340 .get_or_insert_with(Default::default)
1341 .merge_from(other_metrics);
1342 }
1343 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1344 self.fulltext_index_apply_metrics
1345 .get_or_insert_with(Default::default)
1346 .merge_from(other_metrics);
1347 }
1348 }
1349
1350 pub(crate) fn observe(&self) {
1352 READ_ROW_GROUPS_TOTAL
1353 .with_label_values(&["before_filtering"])
1354 .inc_by(self.rg_total as u64);
1355 READ_ROW_GROUPS_TOTAL
1356 .with_label_values(&["fulltext_index_filtered"])
1357 .inc_by(self.rg_fulltext_filtered as u64);
1358 READ_ROW_GROUPS_TOTAL
1359 .with_label_values(&["inverted_index_filtered"])
1360 .inc_by(self.rg_inverted_filtered as u64);
1361 READ_ROW_GROUPS_TOTAL
1362 .with_label_values(&["minmax_index_filtered"])
1363 .inc_by(self.rg_minmax_filtered as u64);
1364 READ_ROW_GROUPS_TOTAL
1365 .with_label_values(&["bloom_filter_index_filtered"])
1366 .inc_by(self.rg_bloom_filtered as u64);
1367 READ_ROW_GROUPS_TOTAL
1368 .with_label_values(&["vector_index_filtered"])
1369 .inc_by(self.rg_vector_filtered as u64);
1370
1371 PRECISE_FILTER_ROWS_TOTAL
1372 .with_label_values(&["parquet"])
1373 .inc_by(self.rows_precise_filtered as u64);
1374 READ_ROWS_IN_ROW_GROUP_TOTAL
1375 .with_label_values(&["before_filtering"])
1376 .inc_by(self.rows_total as u64);
1377 READ_ROWS_IN_ROW_GROUP_TOTAL
1378 .with_label_values(&["fulltext_index_filtered"])
1379 .inc_by(self.rows_fulltext_filtered as u64);
1380 READ_ROWS_IN_ROW_GROUP_TOTAL
1381 .with_label_values(&["inverted_index_filtered"])
1382 .inc_by(self.rows_inverted_filtered as u64);
1383 READ_ROWS_IN_ROW_GROUP_TOTAL
1384 .with_label_values(&["bloom_filter_index_filtered"])
1385 .inc_by(self.rows_bloom_filtered as u64);
1386 READ_ROWS_IN_ROW_GROUP_TOTAL
1387 .with_label_values(&["vector_index_filtered"])
1388 .inc_by(self.rows_vector_filtered as u64);
1389 }
1390
1391 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1392 match index_type {
1393 INDEX_TYPE_FULLTEXT => {
1394 self.rg_fulltext_filtered += row_group_count;
1395 self.rows_fulltext_filtered += row_count;
1396 }
1397 INDEX_TYPE_INVERTED => {
1398 self.rg_inverted_filtered += row_group_count;
1399 self.rows_inverted_filtered += row_count;
1400 }
1401 INDEX_TYPE_BLOOM => {
1402 self.rg_bloom_filtered += row_group_count;
1403 self.rows_bloom_filtered += row_count;
1404 }
1405 INDEX_TYPE_VECTOR => {
1406 self.rg_vector_filtered += row_group_count;
1407 self.rows_vector_filtered += row_count;
1408 }
1409 _ => {}
1410 }
1411 }
1412}
1413
1414#[cfg(all(test, feature = "vector_index"))]
1415mod tests {
1416 use super::*;
1417
1418 #[test]
1419 fn test_vector_selection_from_offsets() {
1420 let row_group_size = 4;
1421 let num_row_groups = 3;
1422 let selection =
1423 vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1424 .unwrap();
1425
1426 assert_eq!(selection.row_group_count(), 3);
1427 assert_eq!(selection.row_count(), 4);
1428 assert!(selection.contains_non_empty_row_group(0));
1429 assert!(selection.contains_non_empty_row_group(1));
1430 assert!(selection.contains_non_empty_row_group(2));
1431 }
1432
1433 #[test]
1434 fn test_vector_selection_from_offsets_out_of_range() {
1435 let row_group_size = 4;
1436 let num_row_groups = 2;
1437 let selection = vector_selection_from_offsets(
1438 vec![0, 7, u64::from(u32::MAX) + 1],
1439 row_group_size,
1440 num_row_groups,
1441 );
1442 assert!(selection.is_err());
1443 }
1444
1445 #[test]
1446 fn test_vector_selection_updates_metrics() {
1447 let row_group_size = 4;
1448 let total_rows = 8;
1449 let mut output = RowGroupSelection::new(row_group_size, total_rows);
1450 let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1451 let mut metrics = ReaderFilterMetrics::default();
1452
1453 apply_selection_and_update_metrics(
1454 &mut output,
1455 &selection,
1456 &mut metrics,
1457 INDEX_TYPE_VECTOR,
1458 );
1459
1460 assert_eq!(metrics.rg_vector_filtered, 1);
1461 assert_eq!(metrics.rows_vector_filtered, 7);
1462 assert_eq!(output.row_count(), 1);
1463 }
1464}
1465
1466#[derive(Default, Clone, Copy)]
1468pub(crate) struct MetadataCacheMetrics {
1469 pub(crate) mem_cache_hit: usize,
1471 pub(crate) file_cache_hit: usize,
1473 pub(crate) cache_miss: usize,
1475 pub(crate) metadata_load_cost: Duration,
1477 pub(crate) num_reads: usize,
1479 pub(crate) bytes_read: u64,
1481}
1482
1483impl std::fmt::Debug for MetadataCacheMetrics {
1484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1485 let Self {
1486 mem_cache_hit,
1487 file_cache_hit,
1488 cache_miss,
1489 metadata_load_cost,
1490 num_reads,
1491 bytes_read,
1492 } = self;
1493
1494 if self.is_empty() {
1495 return write!(f, "{{}}");
1496 }
1497 write!(f, "{{")?;
1498
1499 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1500
1501 if *mem_cache_hit > 0 {
1502 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1503 }
1504 if *file_cache_hit > 0 {
1505 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1506 }
1507 if *cache_miss > 0 {
1508 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1509 }
1510 if *num_reads > 0 {
1511 write!(f, ", \"num_reads\":{}", num_reads)?;
1512 }
1513 if *bytes_read > 0 {
1514 write!(f, ", \"bytes_read\":{}", bytes_read)?;
1515 }
1516
1517 write!(f, "}}")
1518 }
1519}
1520
1521impl MetadataCacheMetrics {
1522 pub(crate) fn is_empty(&self) -> bool {
1524 self.metadata_load_cost.is_zero()
1525 }
1526
1527 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1529 self.mem_cache_hit += other.mem_cache_hit;
1530 self.file_cache_hit += other.file_cache_hit;
1531 self.cache_miss += other.cache_miss;
1532 self.metadata_load_cost += other.metadata_load_cost;
1533 self.num_reads += other.num_reads;
1534 self.bytes_read += other.bytes_read;
1535 }
1536}
1537
1538#[derive(Debug, Default, Clone)]
1540pub struct ReaderMetrics {
1541 pub(crate) filter_metrics: ReaderFilterMetrics,
1543 pub(crate) build_cost: Duration,
1545 pub(crate) scan_cost: Duration,
1547 pub(crate) num_record_batches: usize,
1549 pub(crate) num_batches: usize,
1551 pub(crate) num_rows: usize,
1553 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1555 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1557 pub(crate) metadata_mem_size: isize,
1559 pub(crate) num_range_builders: isize,
1561}
1562
1563impl ReaderMetrics {
1564 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1566 self.filter_metrics.merge_from(&other.filter_metrics);
1567 self.build_cost += other.build_cost;
1568 self.scan_cost += other.scan_cost;
1569 self.num_record_batches += other.num_record_batches;
1570 self.num_batches += other.num_batches;
1571 self.num_rows += other.num_rows;
1572 self.metadata_cache_metrics
1573 .merge_from(&other.metadata_cache_metrics);
1574 if let Some(other_fetch) = &other.fetch_metrics {
1575 if let Some(self_fetch) = &self.fetch_metrics {
1576 self_fetch.merge_from(other_fetch);
1577 } else {
1578 self.fetch_metrics = Some(other_fetch.clone());
1579 }
1580 }
1581 self.metadata_mem_size += other.metadata_mem_size;
1582 self.num_range_builders += other.num_range_builders;
1583 }
1584
1585 pub(crate) fn observe_rows(&self, read_type: &str) {
1587 READ_ROWS_TOTAL
1588 .with_label_values(&[read_type])
1589 .inc_by(self.num_rows as u64);
1590 }
1591}
1592
1593pub(crate) struct RowGroupReaderBuilder {
1595 file_handle: FileHandle,
1599 file_path: String,
1601 parquet_meta: Arc<ParquetMetaData>,
1603 object_store: ObjectStore,
1605 projection: ProjectionMask,
1607 field_levels: FieldLevels,
1609 cache_strategy: CacheStrategy,
1611}
1612
1613impl RowGroupReaderBuilder {
1614 pub(crate) fn file_path(&self) -> &str {
1616 &self.file_path
1617 }
1618
1619 pub(crate) fn file_handle(&self) -> &FileHandle {
1621 &self.file_handle
1622 }
1623
1624 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1625 &self.parquet_meta
1626 }
1627
1628 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1629 &self.cache_strategy
1630 }
1631
1632 pub(crate) async fn build(
1634 &self,
1635 row_group_idx: usize,
1636 row_selection: Option<RowSelection>,
1637 fetch_metrics: Option<&ParquetFetchMetrics>,
1638 ) -> Result<ParquetRecordBatchReader> {
1639 let fetch_start = Instant::now();
1640
1641 let mut row_group = InMemoryRowGroup::create(
1642 self.file_handle.region_id(),
1643 self.file_handle.file_id().file_id(),
1644 &self.parquet_meta,
1645 row_group_idx,
1646 self.cache_strategy.clone(),
1647 &self.file_path,
1648 self.object_store.clone(),
1649 );
1650 row_group
1652 .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1653 .await
1654 .context(ReadParquetSnafu {
1655 path: &self.file_path,
1656 })?;
1657
1658 if let Some(metrics) = fetch_metrics {
1660 metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1661 }
1662
1663 ParquetRecordBatchReader::try_new_with_row_groups(
1666 &self.field_levels,
1667 &row_group,
1668 DEFAULT_READ_BATCH_SIZE,
1669 row_selection,
1670 )
1671 .context(ReadParquetSnafu {
1672 path: &self.file_path,
1673 })
1674 }
1675}
1676
1677enum ReaderState {
1679 Readable(PruneReader),
1681 Exhausted(ReaderMetrics),
1683}
1684
1685impl ReaderState {
1686 fn metrics(&self) -> ReaderMetrics {
1688 match self {
1689 ReaderState::Readable(reader) => reader.metrics(),
1690 ReaderState::Exhausted(m) => m.clone(),
1691 }
1692 }
1693}
1694
1695pub(crate) enum MaybeFilter {
1697 Filter(SimpleFilterEvaluator),
1699 Matched,
1701 Pruned,
1703}
1704
1705pub(crate) struct SimpleFilterContext {
1707 filter: MaybeFilter,
1709 column_id: ColumnId,
1711 semantic_type: SemanticType,
1713 data_type: ConcreteDataType,
1715}
1716
1717impl SimpleFilterContext {
1718 pub(crate) fn new_opt(
1723 sst_meta: &RegionMetadataRef,
1724 expected_meta: Option<&RegionMetadata>,
1725 expr: &Expr,
1726 ) -> Option<Self> {
1727 let filter = SimpleFilterEvaluator::try_new(expr)?;
1728 let (column_metadata, maybe_filter) = match expected_meta {
1729 Some(meta) => {
1730 let column = meta.column_by_name(filter.column_name())?;
1732 match sst_meta.column_by_id(column.column_id) {
1735 Some(sst_column) => {
1736 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1737
1738 (column, MaybeFilter::Filter(filter))
1739 }
1740 None => {
1741 if pruned_by_default(&filter, column)? {
1745 (column, MaybeFilter::Pruned)
1746 } else {
1747 (column, MaybeFilter::Matched)
1748 }
1749 }
1750 }
1751 }
1752 None => {
1753 let column = sst_meta.column_by_name(filter.column_name())?;
1754 (column, MaybeFilter::Filter(filter))
1755 }
1756 };
1757
1758 Some(Self {
1759 filter: maybe_filter,
1760 column_id: column_metadata.column_id,
1761 semantic_type: column_metadata.semantic_type,
1762 data_type: column_metadata.column_schema.data_type.clone(),
1763 })
1764 }
1765
1766 pub(crate) fn filter(&self) -> &MaybeFilter {
1768 &self.filter
1769 }
1770
1771 pub(crate) fn column_id(&self) -> ColumnId {
1773 self.column_id
1774 }
1775
1776 pub(crate) fn semantic_type(&self) -> SemanticType {
1778 self.semantic_type
1779 }
1780
1781 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1783 &self.data_type
1784 }
1785}
1786
1787fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1790 let value = column.column_schema.create_default().ok().flatten()?;
1791 let scalar_value = value
1792 .try_to_scalar_value(&column.column_schema.data_type)
1793 .ok()?;
1794 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1795 Some(!matches)
1796}
1797
1798pub struct ParquetReader {
1800 context: FileRangeContextRef,
1802 selection: RowGroupSelection,
1804 reader_state: ReaderState,
1806 fetch_metrics: ParquetFetchMetrics,
1808}
1809
1810#[async_trait]
1811impl BatchReader for ParquetReader {
1812 #[tracing::instrument(
1813 skip_all,
1814 fields(
1815 region_id = %self.context.reader_builder().file_handle.region_id(),
1816 file_id = %self.context.reader_builder().file_handle.file_id()
1817 )
1818 )]
1819 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1820 let ReaderState::Readable(reader) = &mut self.reader_state else {
1821 return Ok(None);
1822 };
1823
1824 if let Some(batch) = reader.next_batch().await? {
1826 return Ok(Some(batch));
1827 }
1828
1829 while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1831 let parquet_reader = self
1832 .context
1833 .reader_builder()
1834 .build(
1835 row_group_idx,
1836 Some(row_selection),
1837 Some(&self.fetch_metrics),
1838 )
1839 .await?;
1840
1841 let skip_fields = self.context.should_skip_fields(row_group_idx);
1844 reader.reset_source(
1845 Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1846 skip_fields,
1847 );
1848 if let Some(batch) = reader.next_batch().await? {
1849 return Ok(Some(batch));
1850 }
1851 }
1852
1853 self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1855 Ok(None)
1856 }
1857}
1858
1859impl Drop for ParquetReader {
1860 fn drop(&mut self) {
1861 let metrics = self.reader_state.metrics();
1862 debug!(
1863 "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1864 self.context.reader_builder().file_handle.region_id(),
1865 self.context.reader_builder().file_handle.file_id(),
1866 self.context.reader_builder().file_handle.time_range(),
1867 metrics.filter_metrics.rg_total
1868 - metrics.filter_metrics.rg_inverted_filtered
1869 - metrics.filter_metrics.rg_minmax_filtered
1870 - metrics.filter_metrics.rg_fulltext_filtered
1871 - metrics.filter_metrics.rg_bloom_filtered,
1872 metrics.filter_metrics.rg_total,
1873 metrics
1874 );
1875
1876 READ_STAGE_ELAPSED
1878 .with_label_values(&["build_parquet_reader"])
1879 .observe(metrics.build_cost.as_secs_f64());
1880 READ_STAGE_ELAPSED
1881 .with_label_values(&["scan_row_groups"])
1882 .observe(metrics.scan_cost.as_secs_f64());
1883 metrics.observe_rows("parquet_reader");
1884 metrics.filter_metrics.observe();
1885 }
1886}
1887
1888impl ParquetReader {
1889 #[tracing::instrument(
1891 skip_all,
1892 fields(
1893 region_id = %context.reader_builder().file_handle.region_id(),
1894 file_id = %context.reader_builder().file_handle.file_id()
1895 )
1896 )]
1897 pub(crate) async fn new(
1898 context: FileRangeContextRef,
1899 mut selection: RowGroupSelection,
1900 ) -> Result<Self> {
1901 let fetch_metrics = ParquetFetchMetrics::default();
1902 let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1904 let parquet_reader = context
1905 .reader_builder()
1906 .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1907 .await?;
1908 let skip_fields = context.should_skip_fields(row_group_idx);
1910 ReaderState::Readable(PruneReader::new_with_row_group_reader(
1911 context.clone(),
1912 RowGroupReader::new(context.clone(), parquet_reader),
1913 skip_fields,
1914 ))
1915 } else {
1916 ReaderState::Exhausted(ReaderMetrics::default())
1917 };
1918
1919 Ok(ParquetReader {
1920 context,
1921 selection,
1922 reader_state,
1923 fetch_metrics,
1924 })
1925 }
1926
1927 pub fn metadata(&self) -> &RegionMetadataRef {
1929 self.context.read_format().metadata()
1930 }
1931
1932 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1933 self.context.reader_builder().parquet_meta.clone()
1934 }
1935}
1936
1937pub(crate) trait RowGroupReaderContext: Send {
1940 fn map_result(
1941 &self,
1942 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1943 ) -> Result<Option<RecordBatch>>;
1944
1945 fn read_format(&self) -> &ReadFormat;
1946}
1947
1948impl RowGroupReaderContext for FileRangeContextRef {
1949 fn map_result(
1950 &self,
1951 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1952 ) -> Result<Option<RecordBatch>> {
1953 result.context(ArrowReaderSnafu {
1954 path: self.file_path(),
1955 })
1956 }
1957
1958 fn read_format(&self) -> &ReadFormat {
1959 self.as_ref().read_format()
1960 }
1961}
1962
1963pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1965
1966impl RowGroupReader {
1967 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1969 Self::create(context, reader)
1970 }
1971}
1972
1973pub(crate) struct RowGroupReaderBase<T> {
1975 context: T,
1977 reader: ParquetRecordBatchReader,
1979 batches: VecDeque<Batch>,
1981 metrics: ReaderMetrics,
1983 override_sequence: Option<ArrayRef>,
1985}
1986
1987impl<T> RowGroupReaderBase<T>
1988where
1989 T: RowGroupReaderContext,
1990{
1991 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1993 let override_sequence = context
1995 .read_format()
1996 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1997 assert!(context.read_format().as_primary_key().is_some());
1998
1999 Self {
2000 context,
2001 reader,
2002 batches: VecDeque::new(),
2003 metrics: ReaderMetrics::default(),
2004 override_sequence,
2005 }
2006 }
2007
2008 pub(crate) fn metrics(&self) -> &ReaderMetrics {
2010 &self.metrics
2011 }
2012
2013 pub(crate) fn read_format(&self) -> &ReadFormat {
2015 self.context.read_format()
2016 }
2017
2018 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2020 self.context.map_result(self.reader.next().transpose())
2021 }
2022
2023 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
2025 let scan_start = Instant::now();
2026 if let Some(batch) = self.batches.pop_front() {
2027 self.metrics.num_rows += batch.num_rows();
2028 self.metrics.scan_cost += scan_start.elapsed();
2029 return Ok(Some(batch));
2030 }
2031
2032 while self.batches.is_empty() {
2034 let Some(record_batch) = self.fetch_next_record_batch()? else {
2035 self.metrics.scan_cost += scan_start.elapsed();
2036 return Ok(None);
2037 };
2038 self.metrics.num_record_batches += 1;
2039
2040 self.context
2042 .read_format()
2043 .as_primary_key()
2044 .unwrap()
2045 .convert_record_batch(
2046 &record_batch,
2047 self.override_sequence.as_ref(),
2048 &mut self.batches,
2049 )?;
2050 self.metrics.num_batches += self.batches.len();
2051 }
2052 let batch = self.batches.pop_front();
2053 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2054 self.metrics.scan_cost += scan_start.elapsed();
2055 Ok(batch)
2056 }
2057}
2058
2059#[async_trait::async_trait]
2060impl<T> BatchReader for RowGroupReaderBase<T>
2061where
2062 T: RowGroupReaderContext,
2063{
2064 async fn next_batch(&mut self) -> Result<Option<Batch>> {
2065 self.next_inner()
2066 }
2067}
2068
2069pub(crate) struct FlatRowGroupReader {
2071 context: FileRangeContextRef,
2073 reader: ParquetRecordBatchReader,
2075 override_sequence: Option<ArrayRef>,
2077}
2078
2079impl FlatRowGroupReader {
2080 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
2082 let override_sequence = context
2084 .read_format()
2085 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2086
2087 Self {
2088 context,
2089 reader,
2090 override_sequence,
2091 }
2092 }
2093
2094 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2096 match self.reader.next() {
2097 Some(batch_result) => {
2098 let record_batch = batch_result.context(ArrowReaderSnafu {
2099 path: self.context.file_path(),
2100 })?;
2101
2102 let flat_format = self.context.read_format().as_flat().unwrap();
2104 let record_batch =
2105 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2106 Ok(Some(record_batch))
2107 }
2108 None => Ok(None),
2109 }
2110 }
2111}