1#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{tracing, warn};
26use datafusion_expr::Expr;
27use datatypes::arrow::array::ArrayRef;
28use datatypes::arrow::datatypes::Field;
29use datatypes::arrow::error::ArrowError;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::data_type::ConcreteDataType;
32use datatypes::prelude::DataType;
33use mito_codec::row_converter::build_primary_key_codec;
34use object_store::ObjectStore;
35use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
36use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
37use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
38use partition::expr::PartitionExpr;
39use snafu::ResultExt;
40use store_api::codec::PrimaryKeyEncoding;
41use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
42use store_api::region_request::PathType;
43use store_api::storage::{ColumnId, FileId};
44use table::predicate::Predicate;
45
46use crate::cache::index::result_cache::PredicateKey;
47use crate::cache::{CacheStrategy, CachedSstMeta};
48#[cfg(feature = "vector_index")]
49use crate::error::ApplyVectorIndexSnafu;
50use crate::error::{
51 ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu,
52};
53use crate::metrics::{
54 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
55 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
56};
57use crate::read::flat_projection::CompactionProjectionMapper;
58use crate::read::prune::FlatPruneReader;
59use crate::read::{Batch, BatchReader};
60use crate::sst::file::FileHandle;
61use crate::sst::index::bloom_filter::applier::{
62 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
63};
64use crate::sst::index::fulltext_index::applier::{
65 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
66};
67use crate::sst::index::inverted_index::applier::{
68 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
69};
70#[cfg(feature = "vector_index")]
71use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
72use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
73use crate::sst::parquet::file_range::{
74 FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
75 row_group_contains_delete,
76};
77use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
78use crate::sst::parquet::metadata::MetadataLoader;
79use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
80use crate::sst::parquet::row_selection::RowGroupSelection;
81use crate::sst::parquet::stats::RowGroupPruningStats;
82use crate::sst::tag_maybe_to_dictionary_field;
83
84const INDEX_TYPE_FULLTEXT: &str = "fulltext";
85const INDEX_TYPE_INVERTED: &str = "inverted";
86const INDEX_TYPE_BLOOM: &str = "bloom filter";
87const INDEX_TYPE_VECTOR: &str = "vector";
88
89macro_rules! handle_index_error {
90 ($err:expr, $file_handle:expr, $index_type:expr) => {
91 if cfg!(any(test, feature = "test")) {
92 panic!(
93 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
94 $index_type,
95 $file_handle.region_id(),
96 $file_handle.file_id(),
97 $err
98 );
99 } else {
100 warn!(
101 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
102 $index_type,
103 $file_handle.region_id(),
104 $file_handle.file_id()
105 );
106 }
107 };
108}
109
110pub struct ParquetReaderBuilder {
112 table_dir: String,
114 path_type: PathType,
116 file_handle: FileHandle,
117 object_store: ObjectStore,
118 predicate: Option<Predicate>,
120 projection: Option<Vec<ColumnId>>,
125 cache_strategy: CacheStrategy,
127 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
129 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
130 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
131 #[cfg(feature = "vector_index")]
133 vector_index_applier: Option<VectorIndexApplierRef>,
134 #[cfg(feature = "vector_index")]
136 vector_index_k: Option<usize>,
137 expected_metadata: Option<RegionMetadataRef>,
141 flat_format: bool,
143 compaction: bool,
145 pre_filter_mode: PreFilterMode,
147 decode_primary_key_values: bool,
149 page_index_policy: PageIndexPolicy,
150}
151
152impl ParquetReaderBuilder {
153 pub fn new(
155 table_dir: String,
156 path_type: PathType,
157 file_handle: FileHandle,
158 object_store: ObjectStore,
159 ) -> ParquetReaderBuilder {
160 ParquetReaderBuilder {
161 table_dir,
162 path_type,
163 file_handle,
164 object_store,
165 predicate: None,
166 projection: None,
167 cache_strategy: CacheStrategy::Disabled,
168 inverted_index_appliers: [None, None],
169 bloom_filter_index_appliers: [None, None],
170 fulltext_index_appliers: [None, None],
171 #[cfg(feature = "vector_index")]
172 vector_index_applier: None,
173 #[cfg(feature = "vector_index")]
174 vector_index_k: None,
175 expected_metadata: None,
176 flat_format: false,
177 compaction: false,
178 pre_filter_mode: PreFilterMode::All,
179 decode_primary_key_values: false,
180 page_index_policy: Default::default(),
181 }
182 }
183
184 #[must_use]
186 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
187 self.predicate = predicate;
188 self
189 }
190
191 #[must_use]
195 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
196 self.projection = projection;
197 self
198 }
199
200 #[must_use]
202 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
203 self.cache_strategy = cache;
204 self
205 }
206
207 #[must_use]
209 pub(crate) fn inverted_index_appliers(
210 mut self,
211 index_appliers: [Option<InvertedIndexApplierRef>; 2],
212 ) -> Self {
213 self.inverted_index_appliers = index_appliers;
214 self
215 }
216
217 #[must_use]
219 pub(crate) fn bloom_filter_index_appliers(
220 mut self,
221 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
222 ) -> Self {
223 self.bloom_filter_index_appliers = index_appliers;
224 self
225 }
226
227 #[must_use]
229 pub(crate) fn fulltext_index_appliers(
230 mut self,
231 index_appliers: [Option<FulltextIndexApplierRef>; 2],
232 ) -> Self {
233 self.fulltext_index_appliers = index_appliers;
234 self
235 }
236
237 #[cfg(feature = "vector_index")]
239 #[must_use]
240 pub(crate) fn vector_index_applier(
241 mut self,
242 applier: Option<VectorIndexApplierRef>,
243 k: Option<usize>,
244 ) -> Self {
245 self.vector_index_applier = applier;
246 self.vector_index_k = k;
247 self
248 }
249
250 #[must_use]
252 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
253 self.expected_metadata = expected_metadata;
254 self
255 }
256
257 #[must_use]
259 pub fn flat_format(mut self, flat_format: bool) -> Self {
260 self.flat_format = flat_format;
261 self
262 }
263
264 #[must_use]
266 pub fn compaction(mut self, compaction: bool) -> Self {
267 self.compaction = compaction;
268 self
269 }
270
271 #[must_use]
273 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
274 self.pre_filter_mode = pre_filter_mode;
275 self
276 }
277
278 #[must_use]
280 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
281 self.decode_primary_key_values = decode;
282 self
283 }
284
285 #[must_use]
286 pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
287 self.page_index_policy = page_index_policy;
288 self
289 }
290
291 #[tracing::instrument(
295 skip_all,
296 fields(
297 region_id = %self.file_handle.region_id(),
298 file_id = %self.file_handle.file_id()
299 )
300 )]
301 pub async fn build(&self) -> Result<Option<ParquetReader>> {
302 let mut metrics = ReaderMetrics::default();
303
304 let Some((context, selection)) = self.build_reader_input_inner(&mut metrics, true).await?
305 else {
306 return Ok(None);
307 };
308 ParquetReader::new(Arc::new(context), selection)
309 .await
310 .map(Some)
311 }
312
313 #[tracing::instrument(
317 skip_all,
318 fields(
319 region_id = %self.file_handle.region_id(),
320 file_id = %self.file_handle.file_id()
321 )
322 )]
323 pub(crate) async fn build_reader_input(
324 &self,
325 metrics: &mut ReaderMetrics,
326 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
327 self.build_reader_input_inner(metrics, self.flat_format)
328 .await
329 }
330
331 async fn build_reader_input_inner(
332 &self,
333 metrics: &mut ReaderMetrics,
334 flat_format: bool,
335 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
336 let start = Instant::now();
337
338 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
339 let file_size = self.file_handle.meta_ref().file_size;
340
341 let (sst_meta, cache_miss) = self
343 .read_parquet_metadata(
344 &file_path,
345 file_size,
346 &mut metrics.metadata_cache_metrics,
347 self.page_index_policy,
348 )
349 .await?;
350 let parquet_meta = sst_meta.parquet_metadata();
351 let region_meta = sst_meta.region_metadata();
352 let region_partition_expr_str = self
353 .expected_metadata
354 .as_ref()
355 .and_then(|meta| meta.partition_expr.as_ref())
356 .map(|expr| expr.as_str());
357 let (_, is_same_region_partition) = Self::is_same_region_partition(
358 region_partition_expr_str,
359 self.file_handle.meta_ref().partition_expr.as_ref(),
360 )?;
361 let skip_auto_convert = self.compaction && is_same_region_partition;
365
366 let compaction_projection_mapper = if self.compaction
375 && !is_same_region_partition
376 && flat_format
377 && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
378 {
379 Some(CompactionProjectionMapper::try_new(®ion_meta)?)
380 } else {
381 None
382 };
383
384 let mut read_format = if let Some(column_ids) = &self.projection {
385 ReadFormat::new(
386 region_meta.clone(),
387 Some(column_ids),
388 flat_format,
389 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
390 &file_path,
391 skip_auto_convert,
392 )?
393 } else {
394 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
396 let column_ids: Vec<_> = expected_meta
397 .column_metadatas
398 .iter()
399 .map(|col| col.column_id)
400 .collect();
401 ReadFormat::new(
402 region_meta.clone(),
403 Some(&column_ids),
404 flat_format,
405 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
406 &file_path,
407 skip_auto_convert,
408 )?
409 };
410 if self.decode_primary_key_values {
411 read_format.set_decode_primary_key_values(true);
412 }
413 if need_override_sequence(&parquet_meta) {
414 read_format
415 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
416 }
417
418 let selection = self
419 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
420 .await;
421
422 if selection.is_empty() {
423 metrics.build_cost += start.elapsed();
424 return Ok(None);
425 }
426
427 if cache_miss && !selection.is_empty() {
429 use crate::cache::file_cache::{FileType, IndexKey};
430 let index_key = IndexKey::new(
431 self.file_handle.region_id(),
432 self.file_handle.file_id().file_id(),
433 FileType::Parquet,
434 );
435 self.cache_strategy.maybe_download_background(
436 index_key,
437 file_path.clone(),
438 self.object_store.clone(),
439 file_size,
440 );
441 }
442
443 let prune_schema = self
444 .expected_metadata
445 .as_ref()
446 .map(|meta| meta.schema.clone())
447 .unwrap_or_else(|| region_meta.schema.clone());
448
449 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
451 let indices = read_format.projection_indices();
452 let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
455
456 let hint = Some(read_format.arrow_schema().fields());
458 let field_levels =
459 parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
460 .context(ReadDataPartSnafu)?;
461
462 let reader_builder = RowGroupReaderBuilder {
463 file_handle: self.file_handle.clone(),
464 file_path,
465 parquet_meta,
466 object_store: self.object_store.clone(),
467 projection: projection_mask,
468 field_levels,
469 cache_strategy: self.cache_strategy.clone(),
470 };
471
472 let filters = if let Some(predicate) = &self.predicate {
473 predicate
474 .exprs()
475 .iter()
476 .filter_map(|expr| {
477 SimpleFilterContext::new_opt(
478 ®ion_meta,
479 self.expected_metadata.as_deref(),
480 expr,
481 )
482 })
483 .collect::<Vec<_>>()
484 } else {
485 vec![]
486 };
487
488 let dyn_filters = if let Some(predicate) = &self.predicate {
489 predicate.dyn_filters().as_ref().clone()
490 } else {
491 vec![]
492 };
493
494 let codec = build_primary_key_codec(read_format.metadata());
495
496 let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
497
498 let context = FileRangeContext::new(
499 reader_builder,
500 RangeBase {
501 filters,
502 dyn_filters,
503 read_format,
504 expected_metadata: self.expected_metadata.clone(),
505 prune_schema,
506 codec,
507 compat_batch: None,
508 compaction_projection_mapper,
509 pre_filter_mode: self.pre_filter_mode,
510 partition_filter,
511 },
512 );
513
514 metrics.build_cost += start.elapsed();
515
516 Ok(Some((context, selection)))
517 }
518
519 fn is_same_region_partition(
520 region_partition_expr_str: Option<&str>,
521 file_partition_expr: Option<&PartitionExpr>,
522 ) -> Result<(Option<PartitionExpr>, bool)> {
523 let region_partition_expr = match region_partition_expr_str {
524 Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
525 None => None,
526 };
527
528 let is_same = region_partition_expr.as_ref() == file_partition_expr;
529 Ok((region_partition_expr, is_same))
530 }
531
532 fn build_partition_filter(
535 &self,
536 read_format: &ReadFormat,
537 prune_schema: &Arc<datatypes::schema::Schema>,
538 ) -> Result<Option<PartitionFilterContext>> {
539 let region_partition_expr_str = self
540 .expected_metadata
541 .as_ref()
542 .and_then(|meta| meta.partition_expr.as_ref());
543 let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
544
545 let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
546 region_partition_expr_str.map(|s| s.as_str()),
547 file_partition_expr_ref,
548 )?;
549
550 if is_same_region_partition {
551 return Ok(None);
552 }
553
554 let Some(region_partition_expr) = region_partition_expr else {
555 return Ok(None);
556 };
557
558 let mut referenced_columns = HashSet::new();
560 region_partition_expr.collect_column_names(&mut referenced_columns);
561
562 let is_flat = read_format.as_flat().is_some();
564 let partition_schema = Arc::new(datatypes::schema::Schema::new(
565 prune_schema
566 .column_schemas()
567 .iter()
568 .filter(|col| referenced_columns.contains(&col.name))
569 .map(|col| {
570 if is_flat
571 && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
572 && column_meta.semantic_type == SemanticType::Tag
573 && col.data_type.is_string()
574 {
575 let field = Arc::new(Field::new(
576 &col.name,
577 col.data_type.as_arrow_type(),
578 col.is_nullable(),
579 ));
580 let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
581 let mut column = col.clone();
582 column.data_type =
583 ConcreteDataType::from_arrow_type(dict_field.data_type());
584 return column;
585 }
586
587 col.clone()
588 })
589 .collect::<Vec<_>>(),
590 ));
591
592 let region_partition_physical_expr = region_partition_expr
593 .try_as_physical_expr(partition_schema.arrow_schema())
594 .context(SerializePartitionExprSnafu)?;
595
596 Ok(Some(PartitionFilterContext {
597 region_partition_physical_expr,
598 partition_schema,
599 }))
600 }
601
602 async fn read_parquet_metadata(
605 &self,
606 file_path: &str,
607 file_size: u64,
608 cache_metrics: &mut MetadataCacheMetrics,
609 page_index_policy: PageIndexPolicy,
610 ) -> Result<(Arc<CachedSstMeta>, bool)> {
611 let start = Instant::now();
612 let _t = READ_STAGE_ELAPSED
613 .with_label_values(&["read_parquet_metadata"])
614 .start_timer();
615
616 let file_id = self.file_handle.file_id();
617 if let Some(metadata) = self
619 .cache_strategy
620 .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
621 .await
622 {
623 cache_metrics.metadata_load_cost += start.elapsed();
624 return Ok((metadata, false));
625 }
626
627 let mut metadata_loader =
629 MetadataLoader::new(self.object_store.clone(), file_path, file_size);
630 metadata_loader.with_page_index_policy(page_index_policy);
631 let metadata = metadata_loader.load(cache_metrics).await?;
632
633 let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
634 self.cache_strategy
636 .put_sst_meta_data(file_id, metadata.clone());
637
638 cache_metrics.metadata_load_cost += start.elapsed();
639 Ok((metadata, true))
640 }
641
642 #[tracing::instrument(
644 skip_all,
645 fields(
646 region_id = %self.file_handle.region_id(),
647 file_id = %self.file_handle.file_id()
648 )
649 )]
650 async fn row_groups_to_read(
651 &self,
652 read_format: &ReadFormat,
653 parquet_meta: &ParquetMetaData,
654 metrics: &mut ReaderFilterMetrics,
655 ) -> RowGroupSelection {
656 let num_row_groups = parquet_meta.num_row_groups();
657 let num_rows = parquet_meta.file_metadata().num_rows();
658 if num_row_groups == 0 || num_rows == 0 {
659 return RowGroupSelection::default();
660 }
661
662 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
665 if row_group_size == 0 {
666 return RowGroupSelection::default();
667 }
668
669 metrics.rg_total += num_row_groups;
670 metrics.rows_total += num_rows as usize;
671
672 let skip_fields = self.compute_skip_fields(parquet_meta);
674
675 let mut output = self.row_groups_by_minmax(
676 read_format,
677 parquet_meta,
678 row_group_size,
679 num_rows as usize,
680 metrics,
681 skip_fields,
682 );
683 if output.is_empty() {
684 return output;
685 }
686
687 let fulltext_filtered = self
688 .prune_row_groups_by_fulltext_index(
689 row_group_size,
690 num_row_groups,
691 &mut output,
692 metrics,
693 skip_fields,
694 )
695 .await;
696 if output.is_empty() {
697 return output;
698 }
699
700 self.prune_row_groups_by_inverted_index(
701 row_group_size,
702 num_row_groups,
703 &mut output,
704 metrics,
705 skip_fields,
706 )
707 .await;
708 if output.is_empty() {
709 return output;
710 }
711
712 self.prune_row_groups_by_bloom_filter(
713 row_group_size,
714 parquet_meta,
715 &mut output,
716 metrics,
717 skip_fields,
718 )
719 .await;
720 if output.is_empty() {
721 return output;
722 }
723
724 if !fulltext_filtered {
725 self.prune_row_groups_by_fulltext_bloom(
726 row_group_size,
727 parquet_meta,
728 &mut output,
729 metrics,
730 skip_fields,
731 )
732 .await;
733 }
734 #[cfg(feature = "vector_index")]
735 {
736 self.prune_row_groups_by_vector_index(
737 row_group_size,
738 num_row_groups,
739 &mut output,
740 metrics,
741 )
742 .await;
743 if output.is_empty() {
744 return output;
745 }
746 }
747 output
748 }
749
750 async fn prune_row_groups_by_fulltext_index(
752 &self,
753 row_group_size: usize,
754 num_row_groups: usize,
755 output: &mut RowGroupSelection,
756 metrics: &mut ReaderFilterMetrics,
757 skip_fields: bool,
758 ) -> bool {
759 if !self.file_handle.meta_ref().fulltext_index_available() {
760 return false;
761 }
762
763 let mut pruned = false;
764 let appliers = if skip_fields {
766 &self.fulltext_index_appliers[..1]
767 } else {
768 &self.fulltext_index_appliers[..]
769 };
770 for index_applier in appliers.iter().flatten() {
771 let predicate_key = index_applier.predicate_key();
772 let cached = self
774 .cache_strategy
775 .index_result_cache()
776 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
777 if let Some(result) = cached.as_ref()
778 && all_required_row_groups_searched(output, result)
779 {
780 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
781 metrics.fulltext_index_cache_hit += 1;
782 pruned = true;
783 continue;
784 }
785
786 metrics.fulltext_index_cache_miss += 1;
788 let file_size_hint = self.file_handle.meta_ref().index_file_size();
789 let apply_res = index_applier
790 .apply_fine(
791 self.file_handle.index_id(),
792 Some(file_size_hint),
793 metrics.fulltext_index_apply_metrics.as_mut(),
794 )
795 .await;
796 let selection = match apply_res {
797 Ok(Some(res)) => {
798 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
799 }
800 Ok(None) => continue,
801 Err(err) => {
802 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
803 continue;
804 }
805 };
806
807 self.apply_index_result_and_update_cache(
808 predicate_key,
809 self.file_handle.file_id().file_id(),
810 selection,
811 output,
812 metrics,
813 INDEX_TYPE_FULLTEXT,
814 );
815 pruned = true;
816 }
817 pruned
818 }
819
820 async fn prune_row_groups_by_inverted_index(
826 &self,
827 row_group_size: usize,
828 num_row_groups: usize,
829 output: &mut RowGroupSelection,
830 metrics: &mut ReaderFilterMetrics,
831 skip_fields: bool,
832 ) -> bool {
833 if !self.file_handle.meta_ref().inverted_index_available() {
834 return false;
835 }
836
837 let mut pruned = false;
838 let appliers = if skip_fields {
840 &self.inverted_index_appliers[..1]
841 } else {
842 &self.inverted_index_appliers[..]
843 };
844 for index_applier in appliers.iter().flatten() {
845 let predicate_key = index_applier.predicate_key();
846 let cached = self
848 .cache_strategy
849 .index_result_cache()
850 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
851 if let Some(result) = cached.as_ref()
852 && all_required_row_groups_searched(output, result)
853 {
854 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
855 metrics.inverted_index_cache_hit += 1;
856 pruned = true;
857 continue;
858 }
859
860 metrics.inverted_index_cache_miss += 1;
862 let file_size_hint = self.file_handle.meta_ref().index_file_size();
863 let apply_res = index_applier
864 .apply(
865 self.file_handle.index_id(),
866 Some(file_size_hint),
867 metrics.inverted_index_apply_metrics.as_mut(),
868 )
869 .await;
870 let selection = match apply_res {
871 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
872 row_group_size,
873 num_row_groups,
874 apply_output,
875 ),
876 Err(err) => {
877 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
878 continue;
879 }
880 };
881
882 self.apply_index_result_and_update_cache(
883 predicate_key,
884 self.file_handle.file_id().file_id(),
885 selection,
886 output,
887 metrics,
888 INDEX_TYPE_INVERTED,
889 );
890 pruned = true;
891 }
892 pruned
893 }
894
895 async fn prune_row_groups_by_bloom_filter(
896 &self,
897 row_group_size: usize,
898 parquet_meta: &ParquetMetaData,
899 output: &mut RowGroupSelection,
900 metrics: &mut ReaderFilterMetrics,
901 skip_fields: bool,
902 ) -> bool {
903 if !self.file_handle.meta_ref().bloom_filter_index_available() {
904 return false;
905 }
906
907 let mut pruned = false;
908 let appliers = if skip_fields {
910 &self.bloom_filter_index_appliers[..1]
911 } else {
912 &self.bloom_filter_index_appliers[..]
913 };
914 for index_applier in appliers.iter().flatten() {
915 let predicate_key = index_applier.predicate_key();
916 let cached = self
918 .cache_strategy
919 .index_result_cache()
920 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
921 if let Some(result) = cached.as_ref()
922 && all_required_row_groups_searched(output, result)
923 {
924 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
925 metrics.bloom_filter_cache_hit += 1;
926 pruned = true;
927 continue;
928 }
929
930 metrics.bloom_filter_cache_miss += 1;
932 let file_size_hint = self.file_handle.meta_ref().index_file_size();
933 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
934 (
935 rg.num_rows() as usize,
936 output.contains_non_empty_row_group(i)
938 && cached
939 .as_ref()
940 .map(|c| !c.contains_row_group(i))
941 .unwrap_or(true),
942 )
943 });
944 let apply_res = index_applier
945 .apply(
946 self.file_handle.index_id(),
947 Some(file_size_hint),
948 rgs,
949 metrics.bloom_filter_apply_metrics.as_mut(),
950 )
951 .await;
952 let mut selection = match apply_res {
953 Ok(apply_output) => {
954 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
955 }
956 Err(err) => {
957 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
958 continue;
959 }
960 };
961
962 if let Some(cached) = cached.as_ref() {
964 selection.concat(cached);
965 }
966
967 self.apply_index_result_and_update_cache(
968 predicate_key,
969 self.file_handle.file_id().file_id(),
970 selection,
971 output,
972 metrics,
973 INDEX_TYPE_BLOOM,
974 );
975 pruned = true;
976 }
977 pruned
978 }
979
980 #[cfg(feature = "vector_index")]
982 async fn prune_row_groups_by_vector_index(
983 &self,
984 row_group_size: usize,
985 num_row_groups: usize,
986 output: &mut RowGroupSelection,
987 metrics: &mut ReaderFilterMetrics,
988 ) {
989 let Some(applier) = &self.vector_index_applier else {
990 return;
991 };
992 let Some(k) = self.vector_index_k else {
993 return;
994 };
995 if !self.file_handle.meta_ref().vector_index_available() {
996 return;
997 }
998
999 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1000 let apply_res = applier
1001 .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1002 .await;
1003 let row_ids = match apply_res {
1004 Ok(res) => res.row_offsets,
1005 Err(err) => {
1006 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1007 return;
1008 }
1009 };
1010
1011 let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1012 {
1013 Ok(selection) => selection,
1014 Err(err) => {
1015 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1016 return;
1017 }
1018 };
1019 metrics.rows_vector_selected += selection.row_count();
1020 apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1021 }
1022
1023 async fn prune_row_groups_by_fulltext_bloom(
1024 &self,
1025 row_group_size: usize,
1026 parquet_meta: &ParquetMetaData,
1027 output: &mut RowGroupSelection,
1028 metrics: &mut ReaderFilterMetrics,
1029 skip_fields: bool,
1030 ) -> bool {
1031 if !self.file_handle.meta_ref().fulltext_index_available() {
1032 return false;
1033 }
1034
1035 let mut pruned = false;
1036 let appliers = if skip_fields {
1038 &self.fulltext_index_appliers[..1]
1039 } else {
1040 &self.fulltext_index_appliers[..]
1041 };
1042 for index_applier in appliers.iter().flatten() {
1043 let predicate_key = index_applier.predicate_key();
1044 let cached = self
1046 .cache_strategy
1047 .index_result_cache()
1048 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1049 if let Some(result) = cached.as_ref()
1050 && all_required_row_groups_searched(output, result)
1051 {
1052 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1053 metrics.fulltext_index_cache_hit += 1;
1054 pruned = true;
1055 continue;
1056 }
1057
1058 metrics.fulltext_index_cache_miss += 1;
1060 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1061 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1062 (
1063 rg.num_rows() as usize,
1064 output.contains_non_empty_row_group(i)
1066 && cached
1067 .as_ref()
1068 .map(|c| !c.contains_row_group(i))
1069 .unwrap_or(true),
1070 )
1071 });
1072 let apply_res = index_applier
1073 .apply_coarse(
1074 self.file_handle.index_id(),
1075 Some(file_size_hint),
1076 rgs,
1077 metrics.fulltext_index_apply_metrics.as_mut(),
1078 )
1079 .await;
1080 let mut selection = match apply_res {
1081 Ok(Some(apply_output)) => {
1082 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1083 }
1084 Ok(None) => continue,
1085 Err(err) => {
1086 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1087 continue;
1088 }
1089 };
1090
1091 if let Some(cached) = cached.as_ref() {
1093 selection.concat(cached);
1094 }
1095
1096 self.apply_index_result_and_update_cache(
1097 predicate_key,
1098 self.file_handle.file_id().file_id(),
1099 selection,
1100 output,
1101 metrics,
1102 INDEX_TYPE_FULLTEXT,
1103 );
1104 pruned = true;
1105 }
1106 pruned
1107 }
1108
1109 fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
1111 match self.pre_filter_mode {
1112 PreFilterMode::All => false,
1113 PreFilterMode::SkipFields => true,
1114 PreFilterMode::SkipFieldsOnDelete => {
1115 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
1117 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1118 row_group_contains_delete(parquet_meta, rg_idx, &file_path)
1119 .inspect_err(|e| {
1120 warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
1121 })
1122 .unwrap_or(false)
1123 })
1124 }
1125 }
1126 }
1127
1128 fn row_groups_by_minmax(
1130 &self,
1131 read_format: &ReadFormat,
1132 parquet_meta: &ParquetMetaData,
1133 row_group_size: usize,
1134 total_row_count: usize,
1135 metrics: &mut ReaderFilterMetrics,
1136 skip_fields: bool,
1137 ) -> RowGroupSelection {
1138 let Some(predicate) = &self.predicate else {
1139 return RowGroupSelection::new(row_group_size, total_row_count);
1140 };
1141
1142 let file_id = self.file_handle.file_id().file_id();
1143 let index_result_cache = self.cache_strategy.index_result_cache();
1144 let cached_minmax_key =
1145 if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1146 let mut exprs = predicate
1149 .exprs()
1150 .iter()
1151 .map(|expr| format!("{expr:?}"))
1152 .collect::<Vec<_>>();
1153 exprs.sort();
1154 let schema_version = self
1155 .expected_metadata
1156 .as_ref()
1157 .map(|meta| meta.schema_version)
1158 .unwrap_or_else(|| read_format.metadata().schema_version);
1159 Some(PredicateKey::new_minmax(
1160 Arc::new(exprs),
1161 schema_version,
1162 skip_fields,
1163 ))
1164 } else {
1165 None
1166 };
1167
1168 if let Some(index_result_cache) = index_result_cache
1169 && let Some(predicate_key) = cached_minmax_key.as_ref()
1170 {
1171 if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1172 metrics.minmax_cache_hit += 1;
1173 let num_row_groups = parquet_meta.num_row_groups();
1174 metrics.rg_minmax_filtered +=
1175 num_row_groups.saturating_sub(result.row_group_count());
1176 return (*result).clone();
1177 }
1178
1179 metrics.minmax_cache_miss += 1;
1180 }
1181
1182 let region_meta = read_format.metadata();
1183 let row_groups = parquet_meta.row_groups();
1184 let stats = RowGroupPruningStats::new(
1185 row_groups,
1186 read_format,
1187 self.expected_metadata.clone(),
1188 skip_fields,
1189 );
1190 let prune_schema = self
1191 .expected_metadata
1192 .as_ref()
1193 .map(|meta| meta.schema.arrow_schema())
1194 .unwrap_or_else(|| region_meta.schema.arrow_schema());
1195
1196 let mask = predicate.prune_with_stats(&stats, prune_schema);
1200 let output = RowGroupSelection::from_full_row_group_ids(
1201 mask.iter()
1202 .enumerate()
1203 .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1204 row_group_size,
1205 total_row_count,
1206 );
1207
1208 metrics.rg_minmax_filtered += parquet_meta
1209 .num_row_groups()
1210 .saturating_sub(output.row_group_count());
1211
1212 if let Some(index_result_cache) = index_result_cache
1213 && let Some(predicate_key) = cached_minmax_key
1214 {
1215 index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1216 }
1217
1218 output
1219 }
1220
1221 fn apply_index_result_and_update_cache(
1222 &self,
1223 predicate_key: &PredicateKey,
1224 file_id: FileId,
1225 result: RowGroupSelection,
1226 output: &mut RowGroupSelection,
1227 metrics: &mut ReaderFilterMetrics,
1228 index_type: &str,
1229 ) {
1230 apply_selection_and_update_metrics(output, &result, metrics, index_type);
1231
1232 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1233 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1234 }
1235 }
1236}
1237fn apply_selection_and_update_metrics(
1238 output: &mut RowGroupSelection,
1239 result: &RowGroupSelection,
1240 metrics: &mut ReaderFilterMetrics,
1241 index_type: &str,
1242) {
1243 let intersection = output.intersect(result);
1244
1245 let row_group_count = output.row_group_count() - intersection.row_group_count();
1246 let row_count = output.row_count() - intersection.row_count();
1247
1248 metrics.update_index_metrics(index_type, row_group_count, row_count);
1249
1250 *output = intersection;
1251}
1252
1253#[cfg(feature = "vector_index")]
1254fn vector_selection_from_offsets(
1255 row_offsets: Vec<u64>,
1256 row_group_size: usize,
1257 num_row_groups: usize,
1258) -> Result<RowGroupSelection> {
1259 let mut row_ids = BTreeSet::new();
1260 for offset in row_offsets {
1261 let row_id = u32::try_from(offset).map_err(|_| {
1262 ApplyVectorIndexSnafu {
1263 reason: format!("Row offset {} exceeds u32::MAX", offset),
1264 }
1265 .build()
1266 })?;
1267 row_ids.insert(row_id);
1268 }
1269 Ok(RowGroupSelection::from_row_ids(
1270 row_ids,
1271 row_group_size,
1272 num_row_groups,
1273 ))
1274}
1275
1276fn all_required_row_groups_searched(
1277 required_row_groups: &RowGroupSelection,
1278 cached_row_groups: &RowGroupSelection,
1279) -> bool {
1280 required_row_groups.iter().all(|(rg_id, _)| {
1281 !required_row_groups.contains_non_empty_row_group(*rg_id)
1283 || cached_row_groups.contains_row_group(*rg_id)
1285 })
1286}
1287
1288#[derive(Debug, Default, Clone)]
1290pub(crate) struct ReaderFilterMetrics {
1291 pub(crate) rg_total: usize,
1293 pub(crate) rg_fulltext_filtered: usize,
1295 pub(crate) rg_inverted_filtered: usize,
1297 pub(crate) rg_minmax_filtered: usize,
1299 pub(crate) rg_bloom_filtered: usize,
1301 pub(crate) rg_vector_filtered: usize,
1303
1304 pub(crate) rows_total: usize,
1306 pub(crate) rows_fulltext_filtered: usize,
1308 pub(crate) rows_inverted_filtered: usize,
1310 pub(crate) rows_bloom_filtered: usize,
1312 pub(crate) rows_vector_filtered: usize,
1314 pub(crate) rows_vector_selected: usize,
1316 pub(crate) rows_precise_filtered: usize,
1318
1319 pub(crate) fulltext_index_cache_hit: usize,
1321 pub(crate) fulltext_index_cache_miss: usize,
1323 pub(crate) inverted_index_cache_hit: usize,
1325 pub(crate) inverted_index_cache_miss: usize,
1327 pub(crate) bloom_filter_cache_hit: usize,
1329 pub(crate) bloom_filter_cache_miss: usize,
1331 pub(crate) minmax_cache_hit: usize,
1333 pub(crate) minmax_cache_miss: usize,
1335
1336 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1338 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1340 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1342
1343 pub(crate) pruner_cache_hit: usize,
1345 pub(crate) pruner_cache_miss: usize,
1347 pub(crate) pruner_prune_cost: Duration,
1349}
1350
1351impl ReaderFilterMetrics {
1352 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1354 self.rg_total += other.rg_total;
1355 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1356 self.rg_inverted_filtered += other.rg_inverted_filtered;
1357 self.rg_minmax_filtered += other.rg_minmax_filtered;
1358 self.rg_bloom_filtered += other.rg_bloom_filtered;
1359 self.rg_vector_filtered += other.rg_vector_filtered;
1360
1361 self.rows_total += other.rows_total;
1362 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1363 self.rows_inverted_filtered += other.rows_inverted_filtered;
1364 self.rows_bloom_filtered += other.rows_bloom_filtered;
1365 self.rows_vector_filtered += other.rows_vector_filtered;
1366 self.rows_vector_selected += other.rows_vector_selected;
1367 self.rows_precise_filtered += other.rows_precise_filtered;
1368
1369 self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1370 self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1371 self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1372 self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1373 self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1374 self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1375 self.minmax_cache_hit += other.minmax_cache_hit;
1376 self.minmax_cache_miss += other.minmax_cache_miss;
1377
1378 self.pruner_cache_hit += other.pruner_cache_hit;
1379 self.pruner_cache_miss += other.pruner_cache_miss;
1380 self.pruner_prune_cost += other.pruner_prune_cost;
1381
1382 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1384 self.inverted_index_apply_metrics
1385 .get_or_insert_with(Default::default)
1386 .merge_from(other_metrics);
1387 }
1388 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1389 self.bloom_filter_apply_metrics
1390 .get_or_insert_with(Default::default)
1391 .merge_from(other_metrics);
1392 }
1393 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1394 self.fulltext_index_apply_metrics
1395 .get_or_insert_with(Default::default)
1396 .merge_from(other_metrics);
1397 }
1398 }
1399
1400 pub(crate) fn observe(&self) {
1402 READ_ROW_GROUPS_TOTAL
1403 .with_label_values(&["before_filtering"])
1404 .inc_by(self.rg_total as u64);
1405 READ_ROW_GROUPS_TOTAL
1406 .with_label_values(&["fulltext_index_filtered"])
1407 .inc_by(self.rg_fulltext_filtered as u64);
1408 READ_ROW_GROUPS_TOTAL
1409 .with_label_values(&["inverted_index_filtered"])
1410 .inc_by(self.rg_inverted_filtered as u64);
1411 READ_ROW_GROUPS_TOTAL
1412 .with_label_values(&["minmax_index_filtered"])
1413 .inc_by(self.rg_minmax_filtered as u64);
1414 READ_ROW_GROUPS_TOTAL
1415 .with_label_values(&["bloom_filter_index_filtered"])
1416 .inc_by(self.rg_bloom_filtered as u64);
1417 READ_ROW_GROUPS_TOTAL
1418 .with_label_values(&["vector_index_filtered"])
1419 .inc_by(self.rg_vector_filtered as u64);
1420
1421 PRECISE_FILTER_ROWS_TOTAL
1422 .with_label_values(&["parquet"])
1423 .inc_by(self.rows_precise_filtered as u64);
1424 READ_ROWS_IN_ROW_GROUP_TOTAL
1425 .with_label_values(&["before_filtering"])
1426 .inc_by(self.rows_total as u64);
1427 READ_ROWS_IN_ROW_GROUP_TOTAL
1428 .with_label_values(&["fulltext_index_filtered"])
1429 .inc_by(self.rows_fulltext_filtered as u64);
1430 READ_ROWS_IN_ROW_GROUP_TOTAL
1431 .with_label_values(&["inverted_index_filtered"])
1432 .inc_by(self.rows_inverted_filtered as u64);
1433 READ_ROWS_IN_ROW_GROUP_TOTAL
1434 .with_label_values(&["bloom_filter_index_filtered"])
1435 .inc_by(self.rows_bloom_filtered as u64);
1436 READ_ROWS_IN_ROW_GROUP_TOTAL
1437 .with_label_values(&["vector_index_filtered"])
1438 .inc_by(self.rows_vector_filtered as u64);
1439 }
1440
1441 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1442 match index_type {
1443 INDEX_TYPE_FULLTEXT => {
1444 self.rg_fulltext_filtered += row_group_count;
1445 self.rows_fulltext_filtered += row_count;
1446 }
1447 INDEX_TYPE_INVERTED => {
1448 self.rg_inverted_filtered += row_group_count;
1449 self.rows_inverted_filtered += row_count;
1450 }
1451 INDEX_TYPE_BLOOM => {
1452 self.rg_bloom_filtered += row_group_count;
1453 self.rows_bloom_filtered += row_count;
1454 }
1455 INDEX_TYPE_VECTOR => {
1456 self.rg_vector_filtered += row_group_count;
1457 self.rows_vector_filtered += row_count;
1458 }
1459 _ => {}
1460 }
1461 }
1462}
1463
1464#[cfg(all(test, feature = "vector_index"))]
1465mod vector_index_tests {
1466 use super::*;
1467
1468 #[test]
1469 fn test_vector_selection_from_offsets() {
1470 let row_group_size = 4;
1471 let num_row_groups = 3;
1472 let selection =
1473 vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1474 .unwrap();
1475
1476 assert_eq!(selection.row_group_count(), 3);
1477 assert_eq!(selection.row_count(), 4);
1478 assert!(selection.contains_non_empty_row_group(0));
1479 assert!(selection.contains_non_empty_row_group(1));
1480 assert!(selection.contains_non_empty_row_group(2));
1481 }
1482
1483 #[test]
1484 fn test_vector_selection_from_offsets_out_of_range() {
1485 let row_group_size = 4;
1486 let num_row_groups = 2;
1487 let selection = vector_selection_from_offsets(
1488 vec![0, 7, u64::from(u32::MAX) + 1],
1489 row_group_size,
1490 num_row_groups,
1491 );
1492 assert!(selection.is_err());
1493 }
1494
1495 #[test]
1496 fn test_vector_selection_updates_metrics() {
1497 let row_group_size = 4;
1498 let total_rows = 8;
1499 let mut output = RowGroupSelection::new(row_group_size, total_rows);
1500 let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1501 let mut metrics = ReaderFilterMetrics::default();
1502
1503 apply_selection_and_update_metrics(
1504 &mut output,
1505 &selection,
1506 &mut metrics,
1507 INDEX_TYPE_VECTOR,
1508 );
1509
1510 assert_eq!(metrics.rg_vector_filtered, 1);
1511 assert_eq!(metrics.rows_vector_filtered, 7);
1512 assert_eq!(output.row_count(), 1);
1513 }
1514}
1515
1516#[derive(Default, Clone, Copy)]
1518pub(crate) struct MetadataCacheMetrics {
1519 pub(crate) mem_cache_hit: usize,
1521 pub(crate) file_cache_hit: usize,
1523 pub(crate) cache_miss: usize,
1525 pub(crate) metadata_load_cost: Duration,
1527 pub(crate) num_reads: usize,
1529 pub(crate) bytes_read: u64,
1531}
1532
1533impl std::fmt::Debug for MetadataCacheMetrics {
1534 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1535 let Self {
1536 mem_cache_hit,
1537 file_cache_hit,
1538 cache_miss,
1539 metadata_load_cost,
1540 num_reads,
1541 bytes_read,
1542 } = self;
1543
1544 if self.is_empty() {
1545 return write!(f, "{{}}");
1546 }
1547 write!(f, "{{")?;
1548
1549 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1550
1551 if *mem_cache_hit > 0 {
1552 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1553 }
1554 if *file_cache_hit > 0 {
1555 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1556 }
1557 if *cache_miss > 0 {
1558 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1559 }
1560 if *num_reads > 0 {
1561 write!(f, ", \"num_reads\":{}", num_reads)?;
1562 }
1563 if *bytes_read > 0 {
1564 write!(f, ", \"bytes_read\":{}", bytes_read)?;
1565 }
1566
1567 write!(f, "}}")
1568 }
1569}
1570
1571impl MetadataCacheMetrics {
1572 pub(crate) fn is_empty(&self) -> bool {
1574 self.metadata_load_cost.is_zero()
1575 }
1576
1577 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1579 self.mem_cache_hit += other.mem_cache_hit;
1580 self.file_cache_hit += other.file_cache_hit;
1581 self.cache_miss += other.cache_miss;
1582 self.metadata_load_cost += other.metadata_load_cost;
1583 self.num_reads += other.num_reads;
1584 self.bytes_read += other.bytes_read;
1585 }
1586}
1587
1588#[derive(Debug, Default, Clone)]
1590pub struct ReaderMetrics {
1591 pub(crate) filter_metrics: ReaderFilterMetrics,
1593 pub(crate) build_cost: Duration,
1595 pub(crate) scan_cost: Duration,
1597 pub(crate) num_record_batches: usize,
1599 pub(crate) num_batches: usize,
1601 pub(crate) num_rows: usize,
1603 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1605 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1607 pub(crate) metadata_mem_size: isize,
1609 pub(crate) num_range_builders: isize,
1611}
1612
1613impl ReaderMetrics {
1614 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1616 self.filter_metrics.merge_from(&other.filter_metrics);
1617 self.build_cost += other.build_cost;
1618 self.scan_cost += other.scan_cost;
1619 self.num_record_batches += other.num_record_batches;
1620 self.num_batches += other.num_batches;
1621 self.num_rows += other.num_rows;
1622 self.metadata_cache_metrics
1623 .merge_from(&other.metadata_cache_metrics);
1624 if let Some(other_fetch) = &other.fetch_metrics {
1625 if let Some(self_fetch) = &self.fetch_metrics {
1626 self_fetch.merge_from(other_fetch);
1627 } else {
1628 self.fetch_metrics = Some(other_fetch.clone());
1629 }
1630 }
1631 self.metadata_mem_size += other.metadata_mem_size;
1632 self.num_range_builders += other.num_range_builders;
1633 }
1634
1635 pub(crate) fn observe_rows(&self, read_type: &str) {
1637 READ_ROWS_TOTAL
1638 .with_label_values(&[read_type])
1639 .inc_by(self.num_rows as u64);
1640 }
1641}
1642
1643pub(crate) struct RowGroupReaderBuilder {
1645 file_handle: FileHandle,
1649 file_path: String,
1651 parquet_meta: Arc<ParquetMetaData>,
1653 object_store: ObjectStore,
1655 projection: ProjectionMask,
1657 field_levels: FieldLevels,
1659 cache_strategy: CacheStrategy,
1661}
1662
1663impl RowGroupReaderBuilder {
1664 pub(crate) fn file_path(&self) -> &str {
1666 &self.file_path
1667 }
1668
1669 pub(crate) fn file_handle(&self) -> &FileHandle {
1671 &self.file_handle
1672 }
1673
1674 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1675 &self.parquet_meta
1676 }
1677
1678 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1679 &self.cache_strategy
1680 }
1681
1682 pub(crate) async fn build(
1684 &self,
1685 row_group_idx: usize,
1686 row_selection: Option<RowSelection>,
1687 fetch_metrics: Option<&ParquetFetchMetrics>,
1688 ) -> Result<ParquetRecordBatchReader> {
1689 let fetch_start = Instant::now();
1690
1691 let mut row_group = InMemoryRowGroup::create(
1692 self.file_handle.region_id(),
1693 self.file_handle.file_id().file_id(),
1694 &self.parquet_meta,
1695 row_group_idx,
1696 self.cache_strategy.clone(),
1697 &self.file_path,
1698 self.object_store.clone(),
1699 );
1700 row_group
1702 .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1703 .await
1704 .context(ReadParquetSnafu {
1705 path: &self.file_path,
1706 })?;
1707
1708 if let Some(metrics) = fetch_metrics {
1710 metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1711 }
1712
1713 ParquetRecordBatchReader::try_new_with_row_groups(
1716 &self.field_levels,
1717 &row_group,
1718 DEFAULT_READ_BATCH_SIZE,
1719 row_selection,
1720 )
1721 .context(ReadParquetSnafu {
1722 path: &self.file_path,
1723 })
1724 }
1725}
1726
1727pub(crate) enum MaybeFilter {
1729 Filter(SimpleFilterEvaluator),
1731 Matched,
1733 Pruned,
1735}
1736
1737pub(crate) struct SimpleFilterContext {
1739 filter: MaybeFilter,
1741 column_id: ColumnId,
1743 semantic_type: SemanticType,
1745 data_type: ConcreteDataType,
1747}
1748
1749impl SimpleFilterContext {
1750 pub(crate) fn new_opt(
1755 sst_meta: &RegionMetadataRef,
1756 expected_meta: Option<&RegionMetadata>,
1757 expr: &Expr,
1758 ) -> Option<Self> {
1759 let filter = SimpleFilterEvaluator::try_new(expr)?;
1760 let (column_metadata, maybe_filter) = match expected_meta {
1761 Some(meta) => {
1762 let column = meta.column_by_name(filter.column_name())?;
1764 match sst_meta.column_by_id(column.column_id) {
1767 Some(sst_column) => {
1768 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1769
1770 (column, MaybeFilter::Filter(filter))
1771 }
1772 None => {
1773 if pruned_by_default(&filter, column)? {
1777 (column, MaybeFilter::Pruned)
1778 } else {
1779 (column, MaybeFilter::Matched)
1780 }
1781 }
1782 }
1783 }
1784 None => {
1785 let column = sst_meta.column_by_name(filter.column_name())?;
1786 (column, MaybeFilter::Filter(filter))
1787 }
1788 };
1789
1790 Some(Self {
1791 filter: maybe_filter,
1792 column_id: column_metadata.column_id,
1793 semantic_type: column_metadata.semantic_type,
1794 data_type: column_metadata.column_schema.data_type.clone(),
1795 })
1796 }
1797
1798 pub(crate) fn filter(&self) -> &MaybeFilter {
1800 &self.filter
1801 }
1802
1803 pub(crate) fn column_id(&self) -> ColumnId {
1805 self.column_id
1806 }
1807
1808 pub(crate) fn semantic_type(&self) -> SemanticType {
1810 self.semantic_type
1811 }
1812
1813 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1815 &self.data_type
1816 }
1817}
1818
1819fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1822 let value = column.column_schema.create_default().ok().flatten()?;
1823 let scalar_value = value
1824 .try_to_scalar_value(&column.column_schema.data_type)
1825 .ok()?;
1826 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1827 Some(!matches)
1828}
1829
1830pub struct ParquetReader {
1832 context: FileRangeContextRef,
1834 selection: RowGroupSelection,
1836 reader: Option<FlatPruneReader>,
1838 fetch_metrics: ParquetFetchMetrics,
1840}
1841
1842impl ParquetReader {
1843 #[tracing::instrument(
1844 skip_all,
1845 fields(
1846 region_id = %self.context.reader_builder().file_handle.region_id(),
1847 file_id = %self.context.reader_builder().file_handle.file_id()
1848 )
1849 )]
1850 pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1851 loop {
1852 if let Some(reader) = &mut self.reader {
1853 if let Some(batch) = reader.next_batch()? {
1854 return Ok(Some(batch));
1855 }
1856 self.reader = None;
1857 continue;
1858 }
1859
1860 let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
1861 return Ok(None);
1862 };
1863
1864 let parquet_reader = self
1865 .context
1866 .reader_builder()
1867 .build(
1868 row_group_idx,
1869 Some(row_selection),
1870 Some(&self.fetch_metrics),
1871 )
1872 .await?;
1873
1874 let skip_fields = self.context.should_skip_fields(row_group_idx);
1875 self.reader = Some(FlatPruneReader::new_with_row_group_reader(
1876 self.context.clone(),
1877 FlatRowGroupReader::new(self.context.clone(), parquet_reader),
1878 skip_fields,
1879 ));
1880 }
1881 }
1882 #[tracing::instrument(
1884 skip_all,
1885 fields(
1886 region_id = %context.reader_builder().file_handle.region_id(),
1887 file_id = %context.reader_builder().file_handle.file_id()
1888 )
1889 )]
1890 pub(crate) async fn new(
1891 context: FileRangeContextRef,
1892 mut selection: RowGroupSelection,
1893 ) -> Result<Self> {
1894 debug_assert!(context.read_format().as_flat().is_some());
1895 let fetch_metrics = ParquetFetchMetrics::default();
1896 let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1897 let parquet_reader = context
1898 .reader_builder()
1899 .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1900 .await?;
1901 let skip_fields = context.should_skip_fields(row_group_idx);
1902 Some(FlatPruneReader::new_with_row_group_reader(
1903 context.clone(),
1904 FlatRowGroupReader::new(context.clone(), parquet_reader),
1905 skip_fields,
1906 ))
1907 } else {
1908 None
1909 };
1910
1911 Ok(ParquetReader {
1912 context,
1913 selection,
1914 reader,
1915 fetch_metrics,
1916 })
1917 }
1918
1919 pub fn metadata(&self) -> &RegionMetadataRef {
1921 self.context.read_format().metadata()
1922 }
1923
1924 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1925 self.context.reader_builder().parquet_meta.clone()
1926 }
1927}
1928
1929pub(crate) trait RowGroupReaderContext: Send {
1932 fn map_result(
1933 &self,
1934 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1935 ) -> Result<Option<RecordBatch>>;
1936
1937 fn read_format(&self) -> &ReadFormat;
1938}
1939
1940impl RowGroupReaderContext for FileRangeContextRef {
1941 fn map_result(
1942 &self,
1943 result: std::result::Result<Option<RecordBatch>, ArrowError>,
1944 ) -> Result<Option<RecordBatch>> {
1945 result.context(ArrowReaderSnafu {
1946 path: self.file_path(),
1947 })
1948 }
1949
1950 fn read_format(&self) -> &ReadFormat {
1951 self.as_ref().read_format()
1952 }
1953}
1954
1955pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1957
1958impl RowGroupReader {
1959 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1961 Self::create(context, reader)
1962 }
1963}
1964
1965pub(crate) struct RowGroupReaderBase<T> {
1967 context: T,
1969 reader: ParquetRecordBatchReader,
1971 batches: VecDeque<Batch>,
1973 metrics: ReaderMetrics,
1975 override_sequence: Option<ArrayRef>,
1977}
1978
1979impl<T> RowGroupReaderBase<T>
1980where
1981 T: RowGroupReaderContext,
1982{
1983 pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1985 let override_sequence = context
1987 .read_format()
1988 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1989 assert!(context.read_format().as_primary_key().is_some());
1990
1991 Self {
1992 context,
1993 reader,
1994 batches: VecDeque::new(),
1995 metrics: ReaderMetrics::default(),
1996 override_sequence,
1997 }
1998 }
1999
2000 pub(crate) fn metrics(&self) -> &ReaderMetrics {
2002 &self.metrics
2003 }
2004
2005 pub(crate) fn read_format(&self) -> &ReadFormat {
2007 self.context.read_format()
2008 }
2009
2010 fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2012 self.context.map_result(self.reader.next().transpose())
2013 }
2014
2015 pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
2017 let scan_start = Instant::now();
2018 if let Some(batch) = self.batches.pop_front() {
2019 self.metrics.num_rows += batch.num_rows();
2020 self.metrics.scan_cost += scan_start.elapsed();
2021 return Ok(Some(batch));
2022 }
2023
2024 while self.batches.is_empty() {
2026 let Some(record_batch) = self.fetch_next_record_batch()? else {
2027 self.metrics.scan_cost += scan_start.elapsed();
2028 return Ok(None);
2029 };
2030 self.metrics.num_record_batches += 1;
2031
2032 self.context
2034 .read_format()
2035 .as_primary_key()
2036 .unwrap()
2037 .convert_record_batch(
2038 &record_batch,
2039 self.override_sequence.as_ref(),
2040 &mut self.batches,
2041 )?;
2042 self.metrics.num_batches += self.batches.len();
2043 }
2044 let batch = self.batches.pop_front();
2045 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2046 self.metrics.scan_cost += scan_start.elapsed();
2047 Ok(batch)
2048 }
2049}
2050
2051#[async_trait::async_trait]
2052impl<T> BatchReader for RowGroupReaderBase<T>
2053where
2054 T: RowGroupReaderContext,
2055{
2056 async fn next_batch(&mut self) -> Result<Option<Batch>> {
2057 self.next_inner()
2058 }
2059}
2060
2061pub(crate) struct FlatRowGroupReader {
2063 context: FileRangeContextRef,
2065 reader: ParquetRecordBatchReader,
2067 override_sequence: Option<ArrayRef>,
2069}
2070
2071impl FlatRowGroupReader {
2072 pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
2074 let override_sequence = context
2076 .read_format()
2077 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2078
2079 Self {
2080 context,
2081 reader,
2082 override_sequence,
2083 }
2084 }
2085
2086 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2088 match self.reader.next() {
2089 Some(batch_result) => {
2090 let record_batch = batch_result.context(ArrowReaderSnafu {
2091 path: self.context.file_path(),
2092 })?;
2093
2094 let flat_format = self.context.read_format().as_flat().unwrap();
2096 let record_batch =
2097 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2098 Ok(Some(record_batch))
2099 }
2100 None => Ok(None),
2101 }
2102 }
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107 use std::any::Any;
2108 use std::fmt::{Debug, Formatter};
2109 use std::sync::{Arc, LazyLock};
2110
2111 use datafusion::arrow::datatypes::DataType;
2112 use datafusion_common::ScalarValue;
2113 use datafusion_expr::expr::ScalarFunction;
2114 use datafusion_expr::{
2115 ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2116 };
2117 use datatypes::arrow::array::{ArrayRef, Int64Array};
2118 use datatypes::arrow::record_batch::RecordBatch;
2119 use object_store::services::Memory;
2120 use parquet::arrow::ArrowWriter;
2121 use store_api::region_request::PathType;
2122 use table::predicate::Predicate;
2123
2124 use super::*;
2125 use crate::sst::parquet::metadata::MetadataLoader;
2126 use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2127
2128 #[tokio::test(flavor = "current_thread")]
2129 async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2130 #[derive(Eq, PartialEq, Hash)]
2131 struct PanicDebugUdf;
2132
2133 impl Debug for PanicDebugUdf {
2134 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2135 panic!("minmax predicate key should not format exprs when cache is disabled");
2136 }
2137 }
2138
2139 impl ScalarUDFImpl for PanicDebugUdf {
2140 fn as_any(&self) -> &dyn Any {
2141 self
2142 }
2143
2144 fn name(&self) -> &str {
2145 "panic_debug_udf"
2146 }
2147
2148 fn signature(&self) -> &Signature {
2149 static SIGNATURE: LazyLock<Signature> =
2150 LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2151 &SIGNATURE
2152 }
2153
2154 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2155 Ok(DataType::Int64)
2156 }
2157
2158 fn invoke_with_args(
2159 &self,
2160 _args: ScalarFunctionArgs,
2161 ) -> datafusion_common::Result<ColumnarValue> {
2162 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2163 }
2164 }
2165
2166 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2167 let file_handle = sst_file_handle(0, 1);
2168 let table_dir = "test_table".to_string();
2169 let path_type = PathType::Bare;
2170 let file_path = file_handle.file_path(&table_dir, path_type);
2171
2172 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2173 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2174 let mut parquet_bytes = Vec::new();
2175 let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2176 writer.write(&batch).unwrap();
2177 writer.close().unwrap();
2178 let file_size = parquet_bytes.len() as u64;
2179 object_store.write(&file_path, parquet_bytes).await.unwrap();
2180
2181 let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2182 let read_format =
2183 ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
2184
2185 let mut cache_metrics = MetadataCacheMetrics::default();
2186 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2187 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2188
2189 let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2190 let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2191 udf,
2192 vec![],
2193 ))]);
2194 let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2195 .predicate(Some(predicate))
2196 .cache(CacheStrategy::Disabled);
2197
2198 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2199 let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2200 let mut metrics = ReaderFilterMetrics::default();
2201 let selection = builder.row_groups_by_minmax(
2202 &read_format,
2203 &parquet_meta,
2204 row_group_size,
2205 total_row_count,
2206 &mut metrics,
2207 false,
2208 );
2209
2210 assert!(!selection.is_empty());
2211 }
2212}