1mod stream;
18
19#[cfg(feature = "vector_index")]
20use std::collections::BTreeSet;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{error, tracing, warn};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::{Expr, Volatility};
32use datatypes::arrow::array::ArrayRef;
33use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::data_type::ConcreteDataType;
36use datatypes::prelude::DataType;
37use datatypes::schema::ext::ArrowSchemaExt;
38use futures::StreamExt;
39use mito_codec::row_converter::build_primary_key_codec;
40use object_store::ObjectStore;
41use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
42use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema};
43use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
44use partition::expr::PartitionExpr;
45use snafu::ResultExt;
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
48use store_api::region_request::PathType;
49use store_api::storage::{ColumnId, FileId};
50use table::predicate::Predicate;
51
52use self::stream::{NestedSchemaAligner, ProjectedRecordBatchStream};
53use crate::cache::index::result_cache::PredicateKey;
54use crate::cache::{CacheStrategy, CachedSstMeta};
55#[cfg(feature = "vector_index")]
56use crate::error::ApplyVectorIndexSnafu;
57use crate::error::{
58 ParquetToArrowSchemaSnafu, ReadDataPartSnafu, Result, SerializePartitionExprSnafu,
59};
60use crate::metrics::{
61 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
62 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
63};
64use crate::read::flat_projection::CompactionProjectionMapper;
65use crate::read::prune::FlatPruneReader;
66use crate::read::read_columns::ReadColumns;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
70};
71use crate::sst::index::fulltext_index::applier::{
72 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
73};
74use crate::sst::index::inverted_index::applier::{
75 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
76};
77#[cfg(feature = "vector_index")]
78use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
79use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
80use crate::sst::parquet::file_range::{
81 FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
82};
83use crate::sst::parquet::flat_format::FlatReadFormat;
84use crate::sst::parquet::format::need_override_sequence;
85use crate::sst::parquet::metadata::MetadataLoader;
86use crate::sst::parquet::prefilter::{
87 PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
88};
89use crate::sst::parquet::push_decoder::{
90 SstParquetRangeFetcher, build_sst_parquet_record_batch_stream,
91};
92use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
93use crate::sst::parquet::row_group::ParquetFetchMetrics;
94use crate::sst::parquet::row_selection::RowGroupSelection;
95use crate::sst::parquet::stats::RowGroupPruningStats;
96use crate::sst::tag_maybe_to_dictionary_field;
97
98const INDEX_TYPE_FULLTEXT: &str = "fulltext";
99const INDEX_TYPE_INVERTED: &str = "inverted";
100const INDEX_TYPE_BLOOM: &str = "bloom filter";
101const INDEX_TYPE_VECTOR: &str = "vector";
102
103macro_rules! handle_index_error {
104 ($err:expr, $file_handle:expr, $index_type:expr) => {
105 if cfg!(any(test, feature = "test")) {
106 panic!(
107 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
108 $index_type,
109 $file_handle.region_id(),
110 $file_handle.file_id(),
111 $err
112 );
113 } else {
114 warn!(
115 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
116 $index_type,
117 $file_handle.region_id(),
118 $file_handle.file_id()
119 );
120 }
121 };
122}
123
124pub struct ParquetReaderBuilder {
126 table_dir: String,
128 path_type: PathType,
130 file_handle: FileHandle,
131 object_store: ObjectStore,
132 predicate: Option<Predicate>,
134 read_cols: Option<ReadColumns>,
139 cache_strategy: CacheStrategy,
141 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
143 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
144 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
145 #[cfg(feature = "vector_index")]
147 vector_index_applier: Option<VectorIndexApplierRef>,
148 #[cfg(feature = "vector_index")]
150 vector_index_k: Option<usize>,
151 expected_metadata: Option<RegionMetadataRef>,
155 compaction: bool,
157 pre_filter_mode: PreFilterMode,
159 decode_primary_key_values: bool,
161 page_index_policy: PageIndexPolicy,
162 defer_optional_page_index: bool,
163}
164
165impl ParquetReaderBuilder {
166 pub fn new(
168 table_dir: String,
169 path_type: PathType,
170 file_handle: FileHandle,
171 object_store: ObjectStore,
172 ) -> ParquetReaderBuilder {
173 ParquetReaderBuilder {
174 table_dir,
175 path_type,
176 file_handle,
177 object_store,
178 predicate: None,
179 read_cols: None,
180 cache_strategy: CacheStrategy::Disabled,
181 inverted_index_appliers: [None, None],
182 bloom_filter_index_appliers: [None, None],
183 fulltext_index_appliers: [None, None],
184 #[cfg(feature = "vector_index")]
185 vector_index_applier: None,
186 #[cfg(feature = "vector_index")]
187 vector_index_k: None,
188 expected_metadata: None,
189 compaction: false,
190 pre_filter_mode: PreFilterMode::All,
191 decode_primary_key_values: false,
192 page_index_policy: Default::default(),
193 defer_optional_page_index: false,
194 }
195 }
196
197 #[must_use]
199 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
200 self.predicate = predicate;
201 self
202 }
203
204 #[must_use]
208 pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
209 self.read_cols = read_cols;
210 self
211 }
212
213 #[must_use]
215 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
216 self.cache_strategy = cache;
217 self
218 }
219
220 #[must_use]
222 pub(crate) fn inverted_index_appliers(
223 mut self,
224 index_appliers: [Option<InvertedIndexApplierRef>; 2],
225 ) -> Self {
226 self.inverted_index_appliers = index_appliers;
227 self
228 }
229
230 #[must_use]
232 pub(crate) fn bloom_filter_index_appliers(
233 mut self,
234 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
235 ) -> Self {
236 self.bloom_filter_index_appliers = index_appliers;
237 self
238 }
239
240 #[must_use]
242 pub(crate) fn fulltext_index_appliers(
243 mut self,
244 index_appliers: [Option<FulltextIndexApplierRef>; 2],
245 ) -> Self {
246 self.fulltext_index_appliers = index_appliers;
247 self
248 }
249
250 #[cfg(feature = "vector_index")]
252 #[must_use]
253 pub(crate) fn vector_index_applier(
254 mut self,
255 applier: Option<VectorIndexApplierRef>,
256 k: Option<usize>,
257 ) -> Self {
258 self.vector_index_applier = applier;
259 self.vector_index_k = k;
260 self
261 }
262
263 #[must_use]
265 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
266 self.expected_metadata = expected_metadata;
267 self
268 }
269
270 #[must_use]
272 pub fn compaction(mut self, compaction: bool) -> Self {
273 self.compaction = compaction;
274 self
275 }
276
277 #[must_use]
279 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
280 self.pre_filter_mode = pre_filter_mode;
281 self
282 }
283
284 #[must_use]
286 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
287 self.decode_primary_key_values = decode;
288 self
289 }
290
291 #[must_use]
292 pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
293 self.page_index_policy = page_index_policy;
294 self
295 }
296
297 #[must_use]
299 pub(crate) fn deferred_optional_page_index(mut self) -> Self {
300 self.page_index_policy = PageIndexPolicy::Optional;
301 self.defer_optional_page_index = true;
302 self
303 }
304
305 #[tracing::instrument(
309 skip_all,
310 fields(
311 region_id = %self.file_handle.region_id(),
312 file_id = %self.file_handle.file_id()
313 )
314 )]
315 pub async fn build(&self) -> Result<Option<ParquetReader>> {
316 let mut metrics = ReaderMetrics::default();
317
318 let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
319 return Ok(None);
320 };
321 ParquetReader::new(Arc::new(context), selection)
322 .await
323 .map(Some)
324 }
325
326 #[tracing::instrument(
330 skip_all,
331 fields(
332 region_id = %self.file_handle.region_id(),
333 file_id = %self.file_handle.file_id()
334 )
335 )]
336 pub(crate) async fn build_reader_input(
337 &self,
338 metrics: &mut ReaderMetrics,
339 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
340 self.build_reader_input_inner(metrics).await
341 }
342
343 async fn build_reader_input_inner(
344 &self,
345 metrics: &mut ReaderMetrics,
346 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
347 let start = Instant::now();
348
349 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
350 let file_size = self.file_handle.meta_ref().file_size;
351
352 let initial_page_index_policy = if self.defer_optional_page_index
354 && self.page_index_policy == PageIndexPolicy::Optional
355 {
356 PageIndexPolicy::Skip
357 } else {
358 self.page_index_policy
359 };
360 let (sst_meta, mut cache_miss) = self
361 .read_parquet_metadata(
362 &file_path,
363 file_size,
364 &mut metrics.metadata_cache_metrics,
365 initial_page_index_policy,
366 )
367 .await?;
368 let mut parquet_meta = sst_meta.parquet_metadata();
369 let region_meta = sst_meta.region_metadata();
370 let region_partition_expr_str = self
371 .expected_metadata
372 .as_ref()
373 .and_then(|meta| meta.partition_expr.as_ref())
374 .map(|expr| expr.as_str());
375 let (_, is_same_region_partition) = Self::is_same_region_partition(
376 region_partition_expr_str,
377 self.file_handle.meta_ref().partition_expr.as_ref(),
378 )?;
379 let skip_auto_convert = self.compaction && is_same_region_partition;
383
384 let compaction_projection_mapper = if self.compaction
393 && !is_same_region_partition
394 && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
395 {
396 Some(CompactionProjectionMapper::try_new(®ion_meta)?)
397 } else {
398 None
399 };
400
401 let read_cols = if let Some(read_cols) = &self.read_cols {
402 read_cols.clone()
403 } else {
404 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
405 ReadColumns::from_deduped_column_ids(
407 expected_meta
408 .column_metadatas
409 .iter()
410 .map(|col| col.column_id),
411 )
412 };
413
414 let file_metadata = parquet_meta.file_metadata();
415 let parquet_schema_desc = file_metadata.schema_descr();
416 let file_schema =
417 parquet_to_arrow_schema(parquet_schema_desc, file_metadata.key_value_metadata())
418 .context(ParquetToArrowSchemaSnafu { file: &file_path })?;
419 let mut read_format = FlatReadFormat::new(
420 region_meta.clone(),
421 read_cols,
422 Some(Arc::new(file_schema)),
423 &file_path,
424 skip_auto_convert,
425 )?;
426 if need_override_sequence(&parquet_meta) {
427 read_format
428 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
429 }
430
431 let parquet_read_cols = read_format.parquet_read_columns();
433 let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
434 let selection = self
435 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
436 .await;
437
438 if selection.is_empty() {
439 metrics.build_cost += start.elapsed();
440 return Ok(None);
441 }
442
443 let prune_schema = self
444 .expected_metadata
445 .as_ref()
446 .map(|meta| meta.schema.clone())
447 .unwrap_or_else(|| region_meta.schema.clone());
448
449 let dyn_filters = if let Some(predicate) = &self.predicate {
450 predicate.dyn_filters().as_ref().clone()
451 } else {
452 vec![]
453 };
454
455 let codec = build_primary_key_codec(read_format.metadata());
456
457 let filter_plan = build_reader_filter_plan(
458 self.predicate.as_ref(),
459 self.expected_metadata.as_deref(),
460 self.pre_filter_mode,
461 &read_format,
462 &codec,
463 );
464
465 if self.defer_optional_page_index
466 && self.page_index_policy == PageIndexPolicy::Optional
467 && (filter_plan.prefilter_builder.is_some()
468 || has_row_level_selection(&selection, &parquet_meta))
469 {
470 let (sst_meta, page_index_cache_miss) = self
471 .read_parquet_metadata(
472 &file_path,
473 file_size,
474 &mut metrics.metadata_cache_metrics,
475 PageIndexPolicy::Optional,
476 )
477 .await?;
478 parquet_meta = sst_meta.parquet_metadata();
479 cache_miss |= page_index_cache_miss;
480 }
481
482 if cache_miss && !selection.is_empty() {
484 use crate::cache::file_cache::{FileType, IndexKey};
485 let index_key = IndexKey::new(
486 self.file_handle.region_id(),
487 self.file_handle.file_id().file_id(),
488 FileType::Parquet,
489 );
490 self.cache_strategy.maybe_download_background(
491 index_key,
492 file_path.clone(),
493 self.object_store.clone(),
494 file_size,
495 );
496 }
497
498 let mut arrow_reader_options = ArrowReaderOptions::new();
500 if !read_format.arrow_schema().has_json_extension_field() {
501 arrow_reader_options =
502 arrow_reader_options.with_schema(read_format.arrow_schema().clone());
503 }
504 let arrow_metadata =
505 ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
506 .context(ReadDataPartSnafu)?;
507
508 let output_schema = read_format.output_arrow_schema()?;
509
510 let reader_builder = RowGroupReaderBuilder {
511 file_handle: self.file_handle.clone(),
512 file_path,
513 parquet_meta,
514 arrow_metadata,
515 output_schema,
516 object_store: self.object_store.clone(),
517 projection: projection_plan,
518 has_nested_projection: parquet_read_cols.has_nested(),
519 cache_strategy: self.cache_strategy.clone(),
520 prefilter_builder: filter_plan.prefilter_builder,
521 };
522
523 let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
524
525 let context = FileRangeContext::new(
526 reader_builder,
527 RangeBase {
528 filters: filter_plan.remaining_simple_filters,
529 dyn_filters,
530 read_format,
531 expected_metadata: self.expected_metadata.clone(),
532 prune_schema,
533 codec,
534 compat_batch: None,
535 compaction_projection_mapper,
536 pre_filter_mode: self.pre_filter_mode,
537 partition_filter,
538 },
539 );
540
541 metrics.build_cost += start.elapsed();
542
543 Ok(Some((context, selection)))
544 }
545
546 fn is_same_region_partition(
547 region_partition_expr_str: Option<&str>,
548 file_partition_expr: Option<&PartitionExpr>,
549 ) -> Result<(Option<PartitionExpr>, bool)> {
550 let region_partition_expr = match region_partition_expr_str {
551 Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
552 None => None,
553 };
554
555 let is_same = region_partition_expr.as_ref() == file_partition_expr;
556 Ok((region_partition_expr, is_same))
557 }
558
559 fn build_partition_filter(
562 &self,
563 read_format: &FlatReadFormat,
564 prune_schema: &Arc<datatypes::schema::Schema>,
565 ) -> Result<Option<PartitionFilterContext>> {
566 let region_partition_expr_str = self
567 .expected_metadata
568 .as_ref()
569 .and_then(|meta| meta.partition_expr.as_ref());
570 let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
571
572 let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
573 region_partition_expr_str.map(|s| s.as_str()),
574 file_partition_expr_ref,
575 )?;
576
577 if is_same_region_partition {
578 return Ok(None);
579 }
580
581 let Some(region_partition_expr) = region_partition_expr else {
582 return Ok(None);
583 };
584
585 let mut referenced_columns = HashSet::new();
587 region_partition_expr.collect_column_names(&mut referenced_columns);
588
589 let partition_schema = Arc::new(datatypes::schema::Schema::new(
591 prune_schema
592 .column_schemas()
593 .iter()
594 .filter(|col| referenced_columns.contains(&col.name))
595 .map(|col| {
596 if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
597 && column_meta.semantic_type == SemanticType::Tag
598 && col.data_type.is_string()
599 {
600 let field = Arc::new(Field::new(
601 &col.name,
602 col.data_type.as_arrow_type(),
603 col.is_nullable(),
604 ));
605 let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
606 let mut column = col.clone();
607 column.data_type =
608 ConcreteDataType::from_arrow_type(dict_field.data_type());
609 return column;
610 }
611
612 col.clone()
613 })
614 .collect::<Vec<_>>(),
615 ));
616
617 let region_partition_physical_expr = region_partition_expr
618 .try_as_physical_expr(partition_schema.arrow_schema())
619 .context(SerializePartitionExprSnafu)?;
620
621 Ok(Some(PartitionFilterContext {
622 region_partition_physical_expr,
623 partition_schema,
624 }))
625 }
626
627 pub(crate) async fn read_parquet_metadata(
630 &self,
631 file_path: &str,
632 file_size: u64,
633 cache_metrics: &mut MetadataCacheMetrics,
634 page_index_policy: PageIndexPolicy,
635 ) -> Result<(Arc<CachedSstMeta>, bool)> {
636 let start = Instant::now();
637 let _t = READ_STAGE_ELAPSED
638 .with_label_values(&["read_parquet_metadata"])
639 .start_timer();
640
641 let file_id = self.file_handle.file_id();
642 if let Some(metadata) = self
644 .cache_strategy
645 .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
646 .await
647 {
648 cache_metrics.metadata_load_cost += start.elapsed();
649 return Ok((metadata, false));
650 }
651
652 let mut metadata_loader =
654 MetadataLoader::new(self.object_store.clone(), file_path, file_size);
655 metadata_loader.with_page_index_policy(page_index_policy);
656 let metadata = metadata_loader.load(cache_metrics).await?;
657
658 let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy(
659 file_path,
660 metadata,
661 None,
662 page_index_policy,
663 )?);
664 self.cache_strategy
666 .put_sst_meta_data(file_id, metadata.clone());
667
668 cache_metrics.metadata_load_cost += start.elapsed();
669 Ok((metadata, true))
670 }
671
672 #[tracing::instrument(
674 skip_all,
675 fields(
676 region_id = %self.file_handle.region_id(),
677 file_id = %self.file_handle.file_id()
678 )
679 )]
680 async fn row_groups_to_read(
681 &self,
682 read_format: &FlatReadFormat,
683 parquet_meta: &ParquetMetaData,
684 metrics: &mut ReaderFilterMetrics,
685 ) -> RowGroupSelection {
686 let num_row_groups = parquet_meta.num_row_groups();
687 let num_rows = parquet_meta.file_metadata().num_rows();
688 if num_row_groups == 0 || num_rows == 0 {
689 return RowGroupSelection::default();
690 }
691
692 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
695 if row_group_size == 0 {
696 return RowGroupSelection::default();
697 }
698
699 metrics.rg_total += num_row_groups;
700 metrics.rows_total += num_rows as usize;
701
702 let skip_fields = self.pre_filter_mode.skip_fields();
704
705 let mut output = self.row_groups_by_minmax(
706 read_format,
707 parquet_meta,
708 row_group_size,
709 num_rows as usize,
710 metrics,
711 skip_fields,
712 );
713 if output.is_empty() {
714 return output;
715 }
716
717 let fulltext_filtered = self
718 .prune_row_groups_by_fulltext_index(
719 row_group_size,
720 num_row_groups,
721 &mut output,
722 metrics,
723 skip_fields,
724 )
725 .await;
726 if output.is_empty() {
727 return output;
728 }
729
730 self.prune_row_groups_by_inverted_index(
731 read_format.metadata(),
732 row_group_size,
733 num_row_groups,
734 &mut output,
735 metrics,
736 skip_fields,
737 )
738 .await;
739 if output.is_empty() {
740 return output;
741 }
742
743 self.prune_row_groups_by_bloom_filter(
744 read_format.metadata(),
745 row_group_size,
746 parquet_meta,
747 &mut output,
748 metrics,
749 skip_fields,
750 )
751 .await;
752 if output.is_empty() {
753 return output;
754 }
755
756 if !fulltext_filtered {
757 self.prune_row_groups_by_fulltext_bloom(
758 row_group_size,
759 parquet_meta,
760 &mut output,
761 metrics,
762 skip_fields,
763 )
764 .await;
765 }
766 #[cfg(feature = "vector_index")]
767 {
768 self.prune_row_groups_by_vector_index(
769 row_group_size,
770 num_row_groups,
771 &mut output,
772 metrics,
773 )
774 .await;
775 if output.is_empty() {
776 return output;
777 }
778 }
779 output
780 }
781
782 async fn prune_row_groups_by_fulltext_index(
784 &self,
785 row_group_size: usize,
786 num_row_groups: usize,
787 output: &mut RowGroupSelection,
788 metrics: &mut ReaderFilterMetrics,
789 skip_fields: bool,
790 ) -> bool {
791 if !self.file_handle.meta_ref().fulltext_index_available() {
792 return false;
793 }
794
795 let mut pruned = false;
796 let appliers = if skip_fields {
798 &self.fulltext_index_appliers[..1]
799 } else {
800 &self.fulltext_index_appliers[..]
801 };
802 for index_applier in appliers.iter().flatten() {
803 let predicate_key = index_applier.predicate_key();
804 let cached = self
806 .cache_strategy
807 .index_result_cache()
808 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
809 if let Some(result) = cached.as_ref()
810 && all_required_row_groups_searched(output, result)
811 {
812 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
813 metrics.fulltext_index_cache_hit += 1;
814 pruned = true;
815 continue;
816 }
817
818 metrics.fulltext_index_cache_miss += 1;
820 let file_size_hint = self.file_handle.meta_ref().index_file_size();
821 let apply_res = index_applier
822 .apply_fine(
823 self.file_handle.index_id(),
824 Some(file_size_hint),
825 metrics.fulltext_index_apply_metrics.as_mut(),
826 )
827 .await;
828 let selection = match apply_res {
829 Ok(Some(res)) => {
830 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
831 }
832 Ok(None) => continue,
833 Err(err) => {
834 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
835 continue;
836 }
837 };
838
839 self.apply_index_result_and_update_cache(
840 predicate_key,
841 self.file_handle.file_id().file_id(),
842 selection,
843 output,
844 metrics,
845 INDEX_TYPE_FULLTEXT,
846 );
847 pruned = true;
848 }
849 pruned
850 }
851
852 async fn prune_row_groups_by_inverted_index(
858 &self,
859 sst_metadata: &RegionMetadataRef,
860 row_group_size: usize,
861 num_row_groups: usize,
862 output: &mut RowGroupSelection,
863 metrics: &mut ReaderFilterMetrics,
864 skip_fields: bool,
865 ) -> bool {
866 if !self.file_handle.meta_ref().inverted_index_available() {
867 return false;
868 }
869
870 let mut pruned = false;
871 let appliers = if skip_fields {
873 &self.inverted_index_appliers[..1]
874 } else {
875 &self.inverted_index_appliers[..]
876 };
877 for index_applier in appliers.iter().flatten() {
878 let Ok(Some(plan)) = index_applier
879 .plan_for_sst(sst_metadata)
880 .inspect_err(|e| warn!(e; "failed to build compatible plan for sst"))
881 else {
882 continue;
883 };
884
885 let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
887 let file_id = self.file_handle.file_id().file_id();
888 cache.get(&plan.predicate_key, file_id)
889 });
890
891 if let Some(result) = cached.as_ref()
892 && all_required_row_groups_searched(output, result)
893 {
894 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
895 metrics.inverted_index_cache_hit += 1;
896 pruned = true;
897 continue;
898 }
899
900 metrics.inverted_index_cache_miss += 1;
902 let file_size_hint = self.file_handle.meta_ref().index_file_size();
903 let apply_res = index_applier
904 .apply(
905 self.file_handle.index_id(),
906 Some(file_size_hint),
907 &plan.index_applier,
908 metrics.inverted_index_apply_metrics.as_mut(),
909 )
910 .await;
911
912 let selection = match apply_res {
913 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
914 row_group_size,
915 num_row_groups,
916 apply_output,
917 ),
918 Err(err) => {
919 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
920 continue;
921 }
922 };
923
924 self.apply_index_result_and_update_cache(
925 &plan.predicate_key,
926 self.file_handle.file_id().file_id(),
927 selection,
928 output,
929 metrics,
930 INDEX_TYPE_INVERTED,
931 );
932 pruned = true;
933 }
934 pruned
935 }
936
937 async fn prune_row_groups_by_bloom_filter(
938 &self,
939 sst_metadata: &RegionMetadataRef,
940 row_group_size: usize,
941 parquet_meta: &ParquetMetaData,
942 output: &mut RowGroupSelection,
943 metrics: &mut ReaderFilterMetrics,
944 skip_fields: bool,
945 ) -> bool {
946 if !self.file_handle.meta_ref().bloom_filter_index_available() {
947 return false;
948 }
949
950 let mut pruned = false;
951 let appliers = if skip_fields {
953 &self.bloom_filter_index_appliers[..1]
954 } else {
955 &self.bloom_filter_index_appliers[..]
956 };
957 for index_applier in appliers.iter().flatten() {
958 let Some(compatible_predicates) =
959 index_applier.compatible_predicate_for_sst(sst_metadata)
960 else {
961 continue;
962 };
963 let predicate_key = PredicateKey::new_bloom(compatible_predicates.clone());
964 let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
966 let file_id = self.file_handle.file_id().file_id();
967 cache.get(&predicate_key, file_id)
968 });
969 if let Some(result) = cached.as_ref()
970 && all_required_row_groups_searched(output, result)
971 {
972 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
973 metrics.bloom_filter_cache_hit += 1;
974 pruned = true;
975 continue;
976 }
977
978 metrics.bloom_filter_cache_miss += 1;
980 let file_size_hint = self.file_handle.meta_ref().index_file_size();
981 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
982 (
983 rg.num_rows() as usize,
984 output.contains_non_empty_row_group(i)
986 && cached
987 .as_ref()
988 .map(|c| !c.contains_row_group(i))
989 .unwrap_or(true),
990 )
991 });
992 let apply_res = index_applier
993 .apply(
994 self.file_handle.index_id(),
995 Some(file_size_hint),
996 &compatible_predicates,
997 rgs,
998 metrics.bloom_filter_apply_metrics.as_mut(),
999 )
1000 .await;
1001 let mut selection = match apply_res {
1002 Ok(apply_output) => {
1003 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1004 }
1005 Err(err) => {
1006 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
1007 continue;
1008 }
1009 };
1010
1011 if let Some(cached) = cached.as_ref() {
1013 selection.concat(cached);
1014 }
1015
1016 self.apply_index_result_and_update_cache(
1017 &predicate_key,
1018 self.file_handle.file_id().file_id(),
1019 selection,
1020 output,
1021 metrics,
1022 INDEX_TYPE_BLOOM,
1023 );
1024 pruned = true;
1025 }
1026 pruned
1027 }
1028
1029 #[cfg(feature = "vector_index")]
1031 async fn prune_row_groups_by_vector_index(
1032 &self,
1033 row_group_size: usize,
1034 num_row_groups: usize,
1035 output: &mut RowGroupSelection,
1036 metrics: &mut ReaderFilterMetrics,
1037 ) {
1038 let Some(applier) = &self.vector_index_applier else {
1039 return;
1040 };
1041 let Some(k) = self.vector_index_k else {
1042 return;
1043 };
1044 if !self.file_handle.meta_ref().vector_index_available() {
1045 return;
1046 }
1047
1048 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1049 let apply_res = applier
1050 .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1051 .await;
1052 let row_ids = match apply_res {
1053 Ok(res) => res.row_offsets,
1054 Err(err) => {
1055 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1056 return;
1057 }
1058 };
1059
1060 let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1061 {
1062 Ok(selection) => selection,
1063 Err(err) => {
1064 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1065 return;
1066 }
1067 };
1068 metrics.rows_vector_selected += selection.row_count();
1069 apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1070 }
1071
1072 async fn prune_row_groups_by_fulltext_bloom(
1073 &self,
1074 row_group_size: usize,
1075 parquet_meta: &ParquetMetaData,
1076 output: &mut RowGroupSelection,
1077 metrics: &mut ReaderFilterMetrics,
1078 skip_fields: bool,
1079 ) -> bool {
1080 if !self.file_handle.meta_ref().fulltext_index_available() {
1081 return false;
1082 }
1083
1084 let mut pruned = false;
1085 let appliers = if skip_fields {
1087 &self.fulltext_index_appliers[..1]
1088 } else {
1089 &self.fulltext_index_appliers[..]
1090 };
1091 for index_applier in appliers.iter().flatten() {
1092 let predicate_key = index_applier.predicate_key();
1093 let cached = self
1095 .cache_strategy
1096 .index_result_cache()
1097 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1098 if let Some(result) = cached.as_ref()
1099 && all_required_row_groups_searched(output, result)
1100 {
1101 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1102 metrics.fulltext_index_cache_hit += 1;
1103 pruned = true;
1104 continue;
1105 }
1106
1107 metrics.fulltext_index_cache_miss += 1;
1109 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1110 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1111 (
1112 rg.num_rows() as usize,
1113 output.contains_non_empty_row_group(i)
1115 && cached
1116 .as_ref()
1117 .map(|c| !c.contains_row_group(i))
1118 .unwrap_or(true),
1119 )
1120 });
1121 let apply_res = index_applier
1122 .apply_coarse(
1123 self.file_handle.index_id(),
1124 Some(file_size_hint),
1125 rgs,
1126 metrics.fulltext_index_apply_metrics.as_mut(),
1127 )
1128 .await;
1129 let mut selection = match apply_res {
1130 Ok(Some(apply_output)) => {
1131 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1132 }
1133 Ok(None) => continue,
1134 Err(err) => {
1135 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1136 continue;
1137 }
1138 };
1139
1140 if let Some(cached) = cached.as_ref() {
1142 selection.concat(cached);
1143 }
1144
1145 self.apply_index_result_and_update_cache(
1146 predicate_key,
1147 self.file_handle.file_id().file_id(),
1148 selection,
1149 output,
1150 metrics,
1151 INDEX_TYPE_FULLTEXT,
1152 );
1153 pruned = true;
1154 }
1155 pruned
1156 }
1157
1158 fn row_groups_by_minmax(
1160 &self,
1161 read_format: &FlatReadFormat,
1162 parquet_meta: &ParquetMetaData,
1163 row_group_size: usize,
1164 total_row_count: usize,
1165 metrics: &mut ReaderFilterMetrics,
1166 skip_fields: bool,
1167 ) -> RowGroupSelection {
1168 let Some(predicate) = &self.predicate else {
1169 return RowGroupSelection::new(row_group_size, total_row_count);
1170 };
1171
1172 let file_id = self.file_handle.file_id().file_id();
1173 let index_result_cache = self.cache_strategy.index_result_cache();
1174 let cached_minmax_key =
1175 if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1176 let mut exprs = predicate
1179 .exprs()
1180 .iter()
1181 .map(|expr| format!("{expr:?}"))
1182 .collect::<Vec<_>>();
1183 exprs.sort();
1184 let schema_version = self
1185 .expected_metadata
1186 .as_ref()
1187 .map(|meta| meta.schema_version)
1188 .unwrap_or_else(|| read_format.metadata().schema_version);
1189 Some(PredicateKey::new_minmax(
1190 Arc::new(exprs),
1191 schema_version,
1192 skip_fields,
1193 ))
1194 } else {
1195 None
1196 };
1197
1198 if let Some(index_result_cache) = index_result_cache
1199 && let Some(predicate_key) = cached_minmax_key.as_ref()
1200 {
1201 if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1202 metrics.minmax_cache_hit += 1;
1203 let num_row_groups = parquet_meta.num_row_groups();
1204 metrics.rg_minmax_filtered +=
1205 num_row_groups.saturating_sub(result.row_group_count());
1206 return (*result).clone();
1207 }
1208
1209 metrics.minmax_cache_miss += 1;
1210 }
1211
1212 let region_meta = read_format.metadata();
1213 let row_groups = parquet_meta.row_groups();
1214 let stats = RowGroupPruningStats::new(
1215 row_groups,
1216 read_format,
1217 self.expected_metadata.clone(),
1218 skip_fields,
1219 );
1220 let prune_schema = self
1221 .expected_metadata
1222 .as_ref()
1223 .map(|meta| meta.schema.arrow_schema())
1224 .unwrap_or_else(|| region_meta.schema.arrow_schema());
1225
1226 let mask = predicate.prune_with_stats(&stats, prune_schema);
1230 let output = RowGroupSelection::from_full_row_group_ids(
1231 mask.iter()
1232 .enumerate()
1233 .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1234 row_group_size,
1235 total_row_count,
1236 );
1237
1238 metrics.rg_minmax_filtered += parquet_meta
1239 .num_row_groups()
1240 .saturating_sub(output.row_group_count());
1241
1242 if let Some(index_result_cache) = index_result_cache
1243 && let Some(predicate_key) = cached_minmax_key
1244 {
1245 index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1246 }
1247
1248 output
1249 }
1250
1251 fn apply_index_result_and_update_cache(
1252 &self,
1253 predicate_key: &PredicateKey,
1254 file_id: FileId,
1255 result: RowGroupSelection,
1256 output: &mut RowGroupSelection,
1257 metrics: &mut ReaderFilterMetrics,
1258 index_type: &str,
1259 ) {
1260 apply_selection_and_update_metrics(output, &result, metrics, index_type);
1261
1262 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1263 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1264 }
1265 }
1266}
1267
1268fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool {
1269 selection.iter().any(|(row_group_idx, row_selection)| {
1270 let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else {
1271 return false;
1272 };
1273
1274 row_selection.row_count() != row_group.num_rows() as usize
1275 || row_selection.iter().any(|selector| selector.skip)
1276 })
1277}
1278
1279fn apply_selection_and_update_metrics(
1280 output: &mut RowGroupSelection,
1281 result: &RowGroupSelection,
1282 metrics: &mut ReaderFilterMetrics,
1283 index_type: &str,
1284) {
1285 let intersection = output.intersect(result);
1286
1287 let row_group_count = output.row_group_count() - intersection.row_group_count();
1288 let row_count = output.row_count() - intersection.row_count();
1289
1290 metrics.update_index_metrics(index_type, row_group_count, row_count);
1291
1292 *output = intersection;
1293}
1294
1295#[cfg(feature = "vector_index")]
1296fn vector_selection_from_offsets(
1297 row_offsets: Vec<u64>,
1298 row_group_size: usize,
1299 num_row_groups: usize,
1300) -> Result<RowGroupSelection> {
1301 let mut row_ids = BTreeSet::new();
1302 for offset in row_offsets {
1303 let row_id = u32::try_from(offset).map_err(|_| {
1304 ApplyVectorIndexSnafu {
1305 reason: format!("Row offset {} exceeds u32::MAX", offset),
1306 }
1307 .build()
1308 })?;
1309 row_ids.insert(row_id);
1310 }
1311 Ok(RowGroupSelection::from_row_ids(
1312 row_ids,
1313 row_group_size,
1314 num_row_groups,
1315 ))
1316}
1317
1318fn all_required_row_groups_searched(
1319 required_row_groups: &RowGroupSelection,
1320 cached_row_groups: &RowGroupSelection,
1321) -> bool {
1322 required_row_groups.iter().all(|(rg_id, _)| {
1323 !required_row_groups.contains_non_empty_row_group(*rg_id)
1325 || cached_row_groups.contains_row_group(*rg_id)
1327 })
1328}
1329
1330#[derive(Debug, Default, Clone)]
1332pub(crate) struct ReaderFilterMetrics {
1333 pub(crate) rg_total: usize,
1335 pub(crate) rg_fulltext_filtered: usize,
1337 pub(crate) rg_inverted_filtered: usize,
1339 pub(crate) rg_minmax_filtered: usize,
1341 pub(crate) rg_bloom_filtered: usize,
1343 pub(crate) rg_vector_filtered: usize,
1345
1346 pub(crate) rows_total: usize,
1348 pub(crate) rows_fulltext_filtered: usize,
1350 pub(crate) rows_inverted_filtered: usize,
1352 pub(crate) rows_bloom_filtered: usize,
1354 pub(crate) rows_vector_filtered: usize,
1356 pub(crate) rows_vector_selected: usize,
1358 pub(crate) rows_precise_filtered: usize,
1360
1361 pub(crate) fulltext_index_cache_hit: usize,
1363 pub(crate) fulltext_index_cache_miss: usize,
1365 pub(crate) inverted_index_cache_hit: usize,
1367 pub(crate) inverted_index_cache_miss: usize,
1369 pub(crate) bloom_filter_cache_hit: usize,
1371 pub(crate) bloom_filter_cache_miss: usize,
1373 pub(crate) minmax_cache_hit: usize,
1375 pub(crate) minmax_cache_miss: usize,
1377
1378 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1380 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1382 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1384
1385 pub(crate) pruner_cache_hit: usize,
1387 pub(crate) pruner_cache_miss: usize,
1389 pub(crate) pruner_prune_cost: Duration,
1391}
1392
1393impl ReaderFilterMetrics {
1394 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1396 self.rg_total += other.rg_total;
1397 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1398 self.rg_inverted_filtered += other.rg_inverted_filtered;
1399 self.rg_minmax_filtered += other.rg_minmax_filtered;
1400 self.rg_bloom_filtered += other.rg_bloom_filtered;
1401 self.rg_vector_filtered += other.rg_vector_filtered;
1402
1403 self.rows_total += other.rows_total;
1404 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1405 self.rows_inverted_filtered += other.rows_inverted_filtered;
1406 self.rows_bloom_filtered += other.rows_bloom_filtered;
1407 self.rows_vector_filtered += other.rows_vector_filtered;
1408 self.rows_vector_selected += other.rows_vector_selected;
1409 self.rows_precise_filtered += other.rows_precise_filtered;
1410
1411 self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1412 self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1413 self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1414 self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1415 self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1416 self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1417 self.minmax_cache_hit += other.minmax_cache_hit;
1418 self.minmax_cache_miss += other.minmax_cache_miss;
1419
1420 self.pruner_cache_hit += other.pruner_cache_hit;
1421 self.pruner_cache_miss += other.pruner_cache_miss;
1422 self.pruner_prune_cost += other.pruner_prune_cost;
1423
1424 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1426 self.inverted_index_apply_metrics
1427 .get_or_insert_with(Default::default)
1428 .merge_from(other_metrics);
1429 }
1430 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1431 self.bloom_filter_apply_metrics
1432 .get_or_insert_with(Default::default)
1433 .merge_from(other_metrics);
1434 }
1435 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1436 self.fulltext_index_apply_metrics
1437 .get_or_insert_with(Default::default)
1438 .merge_from(other_metrics);
1439 }
1440 }
1441
1442 pub(crate) fn observe(&self) {
1444 READ_ROW_GROUPS_TOTAL
1445 .with_label_values(&["before_filtering"])
1446 .inc_by(self.rg_total as u64);
1447 READ_ROW_GROUPS_TOTAL
1448 .with_label_values(&["fulltext_index_filtered"])
1449 .inc_by(self.rg_fulltext_filtered as u64);
1450 READ_ROW_GROUPS_TOTAL
1451 .with_label_values(&["inverted_index_filtered"])
1452 .inc_by(self.rg_inverted_filtered as u64);
1453 READ_ROW_GROUPS_TOTAL
1454 .with_label_values(&["minmax_index_filtered"])
1455 .inc_by(self.rg_minmax_filtered as u64);
1456 READ_ROW_GROUPS_TOTAL
1457 .with_label_values(&["bloom_filter_index_filtered"])
1458 .inc_by(self.rg_bloom_filtered as u64);
1459 READ_ROW_GROUPS_TOTAL
1460 .with_label_values(&["vector_index_filtered"])
1461 .inc_by(self.rg_vector_filtered as u64);
1462
1463 PRECISE_FILTER_ROWS_TOTAL
1464 .with_label_values(&["parquet"])
1465 .inc_by(self.rows_precise_filtered as u64);
1466 READ_ROWS_IN_ROW_GROUP_TOTAL
1467 .with_label_values(&["before_filtering"])
1468 .inc_by(self.rows_total as u64);
1469 READ_ROWS_IN_ROW_GROUP_TOTAL
1470 .with_label_values(&["fulltext_index_filtered"])
1471 .inc_by(self.rows_fulltext_filtered as u64);
1472 READ_ROWS_IN_ROW_GROUP_TOTAL
1473 .with_label_values(&["inverted_index_filtered"])
1474 .inc_by(self.rows_inverted_filtered as u64);
1475 READ_ROWS_IN_ROW_GROUP_TOTAL
1476 .with_label_values(&["bloom_filter_index_filtered"])
1477 .inc_by(self.rows_bloom_filtered as u64);
1478 READ_ROWS_IN_ROW_GROUP_TOTAL
1479 .with_label_values(&["vector_index_filtered"])
1480 .inc_by(self.rows_vector_filtered as u64);
1481 }
1482
1483 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1484 match index_type {
1485 INDEX_TYPE_FULLTEXT => {
1486 self.rg_fulltext_filtered += row_group_count;
1487 self.rows_fulltext_filtered += row_count;
1488 }
1489 INDEX_TYPE_INVERTED => {
1490 self.rg_inverted_filtered += row_group_count;
1491 self.rows_inverted_filtered += row_count;
1492 }
1493 INDEX_TYPE_BLOOM => {
1494 self.rg_bloom_filtered += row_group_count;
1495 self.rows_bloom_filtered += row_count;
1496 }
1497 INDEX_TYPE_VECTOR => {
1498 self.rg_vector_filtered += row_group_count;
1499 self.rows_vector_filtered += row_count;
1500 }
1501 _ => {}
1502 }
1503 }
1504}
1505
1506#[cfg(all(test, feature = "vector_index"))]
1507mod vector_index_tests {
1508 use super::*;
1509
1510 #[test]
1511 fn test_vector_selection_from_offsets() {
1512 let row_group_size = 4;
1513 let num_row_groups = 3;
1514 let selection =
1515 vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1516 .unwrap();
1517
1518 assert_eq!(selection.row_group_count(), 3);
1519 assert_eq!(selection.row_count(), 4);
1520 assert!(selection.contains_non_empty_row_group(0));
1521 assert!(selection.contains_non_empty_row_group(1));
1522 assert!(selection.contains_non_empty_row_group(2));
1523 }
1524
1525 #[test]
1526 fn test_vector_selection_from_offsets_out_of_range() {
1527 let row_group_size = 4;
1528 let num_row_groups = 2;
1529 let selection = vector_selection_from_offsets(
1530 vec![0, 7, u64::from(u32::MAX) + 1],
1531 row_group_size,
1532 num_row_groups,
1533 );
1534 assert!(selection.is_err());
1535 }
1536
1537 #[test]
1538 fn test_vector_selection_updates_metrics() {
1539 let row_group_size = 4;
1540 let total_rows = 8;
1541 let mut output = RowGroupSelection::new(row_group_size, total_rows);
1542 let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1543 let mut metrics = ReaderFilterMetrics::default();
1544
1545 apply_selection_and_update_metrics(
1546 &mut output,
1547 &selection,
1548 &mut metrics,
1549 INDEX_TYPE_VECTOR,
1550 );
1551
1552 assert_eq!(metrics.rg_vector_filtered, 1);
1553 assert_eq!(metrics.rows_vector_filtered, 7);
1554 assert_eq!(output.row_count(), 1);
1555 }
1556}
1557
1558#[derive(Default, Clone, Copy)]
1560pub(crate) struct MetadataCacheMetrics {
1561 pub(crate) mem_cache_hit: usize,
1563 pub(crate) file_cache_hit: usize,
1565 pub(crate) cache_miss: usize,
1567 pub(crate) metadata_load_cost: Duration,
1569 pub(crate) num_reads: usize,
1571 pub(crate) bytes_read: u64,
1573}
1574
1575impl std::fmt::Debug for MetadataCacheMetrics {
1576 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1577 let Self {
1578 mem_cache_hit,
1579 file_cache_hit,
1580 cache_miss,
1581 metadata_load_cost,
1582 num_reads,
1583 bytes_read,
1584 } = self;
1585
1586 if self.is_empty() {
1587 return write!(f, "{{}}");
1588 }
1589 write!(f, "{{")?;
1590
1591 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1592
1593 if *mem_cache_hit > 0 {
1594 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1595 }
1596 if *file_cache_hit > 0 {
1597 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1598 }
1599 if *cache_miss > 0 {
1600 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1601 }
1602 if *num_reads > 0 {
1603 write!(f, ", \"num_reads\":{}", num_reads)?;
1604 }
1605 if *bytes_read > 0 {
1606 write!(f, ", \"bytes_read\":{}", bytes_read)?;
1607 }
1608
1609 write!(f, "}}")
1610 }
1611}
1612
1613impl MetadataCacheMetrics {
1614 pub(crate) fn is_empty(&self) -> bool {
1616 self.metadata_load_cost.is_zero()
1617 }
1618
1619 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1621 self.mem_cache_hit += other.mem_cache_hit;
1622 self.file_cache_hit += other.file_cache_hit;
1623 self.cache_miss += other.cache_miss;
1624 self.metadata_load_cost += other.metadata_load_cost;
1625 self.num_reads += other.num_reads;
1626 self.bytes_read += other.bytes_read;
1627 }
1628}
1629
1630#[derive(Debug, Default, Clone)]
1632pub struct ReaderMetrics {
1633 pub(crate) filter_metrics: ReaderFilterMetrics,
1635 pub(crate) build_cost: Duration,
1637 pub(crate) scan_cost: Duration,
1639 pub(crate) num_record_batches: usize,
1641 pub(crate) num_batches: usize,
1643 pub(crate) num_rows: usize,
1645 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1647 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1649 pub(crate) metadata_mem_size: isize,
1651 pub(crate) num_range_builders: isize,
1653}
1654
1655impl ReaderMetrics {
1656 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1658 self.filter_metrics.merge_from(&other.filter_metrics);
1659 self.build_cost += other.build_cost;
1660 self.scan_cost += other.scan_cost;
1661 self.num_record_batches += other.num_record_batches;
1662 self.num_batches += other.num_batches;
1663 self.num_rows += other.num_rows;
1664 self.metadata_cache_metrics
1665 .merge_from(&other.metadata_cache_metrics);
1666 if let Some(other_fetch) = &other.fetch_metrics {
1667 if let Some(self_fetch) = &self.fetch_metrics {
1668 self_fetch.merge_from(other_fetch);
1669 } else {
1670 self.fetch_metrics = Some(other_fetch.clone());
1671 }
1672 }
1673 self.metadata_mem_size += other.metadata_mem_size;
1674 self.num_range_builders += other.num_range_builders;
1675 }
1676
1677 pub(crate) fn observe_rows(&self, read_type: &str) {
1679 READ_ROWS_TOTAL
1680 .with_label_values(&[read_type])
1681 .inc_by(self.num_rows as u64);
1682 }
1683}
1684
1685pub(crate) struct RowGroupReaderBuilder {
1687 file_handle: FileHandle,
1691 file_path: String,
1693 parquet_meta: Arc<ParquetMetaData>,
1695 arrow_metadata: ArrowReaderMetadata,
1697 output_schema: SchemaRef,
1699 object_store: ObjectStore,
1701 projection: ProjectionMaskPlan,
1703 has_nested_projection: bool,
1705 cache_strategy: CacheStrategy,
1707 prefilter_builder: Option<PrefilterContextBuilder>,
1709}
1710
1711pub(crate) struct RowGroupBuildContext<'a> {
1714 pub(crate) row_group_idx: usize,
1716 pub(crate) row_selection: Option<RowSelection>,
1718 pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1720}
1721
1722impl RowGroupReaderBuilder {
1723 pub(crate) fn file_path(&self) -> &str {
1725 &self.file_path
1726 }
1727
1728 pub(crate) fn file_handle(&self) -> &FileHandle {
1730 &self.file_handle
1731 }
1732
1733 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1734 &self.parquet_meta
1735 }
1736
1737 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1738 &self.cache_strategy
1739 }
1740
1741 pub(crate) fn has_predicate_prefilter(&self) -> bool {
1742 self.prefilter_builder.is_some()
1743 }
1744
1745 pub(crate) async fn build(
1764 &self,
1765 build_ctx: RowGroupBuildContext<'_>,
1766 ) -> Result<ProjectedRecordBatchStream> {
1767 let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1768
1769 let Some(mut prefilter_ctx) = prefilter_ctx else {
1770 let stream = self
1772 .build_with_projection(
1773 build_ctx.row_group_idx,
1774 build_ctx.row_selection,
1775 self.projection.mask.clone(),
1776 build_ctx.fetch_metrics,
1777 )
1778 .await?;
1779 return self.make_projected_stream(stream);
1780 };
1781
1782 let prefilter_start = Instant::now();
1783 let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1784 if let Some(metrics) = build_ctx.fetch_metrics {
1785 let mut data = metrics.data.lock().unwrap();
1786 data.prefilter_cost += prefilter_start.elapsed();
1787 data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1788 }
1789
1790 let refined_selection = Some(prefilter_result.refined_selection);
1791
1792 let stream = self
1793 .build_with_projection(
1794 build_ctx.row_group_idx,
1795 refined_selection,
1796 self.projection.mask.clone(),
1797 build_ctx.fetch_metrics,
1798 )
1799 .await?;
1800 self.make_projected_stream(stream)
1801 }
1802
1803 fn make_projected_stream(
1804 &self,
1805 stream: ProjectedRecordBatchStream,
1806 ) -> Result<ProjectedRecordBatchStream> {
1807 if !self.has_nested_projection {
1808 return Ok(stream);
1809 }
1810
1811 Ok(NestedSchemaAligner::new(
1812 stream,
1813 self.projection.projected_root_presence.clone(),
1814 self.output_schema.clone(),
1815 )?
1816 .boxed())
1817 }
1818
1819 pub(crate) async fn build_with_projection(
1821 &self,
1822 row_group_idx: usize,
1823 row_selection: Option<RowSelection>,
1824 projection: ProjectionMask,
1825 fetch_metrics: Option<&ParquetFetchMetrics>,
1826 ) -> Result<ProjectedRecordBatchStream> {
1827 let range_fetcher = SstParquetRangeFetcher::new(
1828 self.file_handle.file_id(),
1829 self.file_path.clone(),
1830 self.object_store.clone(),
1831 self.cache_strategy.clone(),
1832 row_group_idx,
1833 fetch_metrics.cloned(),
1834 );
1835
1836 build_sst_parquet_record_batch_stream(
1837 self.arrow_metadata.clone(),
1838 row_group_idx,
1839 row_selection,
1840 projection,
1841 range_fetcher,
1842 self.file_path.clone(),
1843 )
1844 }
1845}
1846
1847#[derive(Clone)]
1848pub(crate) enum MaybeFilter {
1850 Filter(SimpleFilterEvaluator),
1852 Matched,
1854 Pruned,
1856}
1857
1858impl MaybeFilter {
1859 pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
1861 match self {
1862 MaybeFilter::Filter(filter) => Some(filter),
1863 MaybeFilter::Matched | MaybeFilter::Pruned => None,
1864 }
1865 }
1866}
1867
1868#[derive(Clone)]
1869pub(crate) struct SimpleFilterContext {
1871 filter: MaybeFilter,
1873 expr_str: String,
1875 column_id: ColumnId,
1877 semantic_type: SemanticType,
1879}
1880
1881impl SimpleFilterContext {
1882 pub(crate) fn new_opt(
1887 sst_meta: &RegionMetadataRef,
1888 expected_meta: Option<&RegionMetadata>,
1889 expr: &Expr,
1890 ) -> Option<Self> {
1891 let filter = SimpleFilterEvaluator::try_new(expr)?;
1892 let expr_str = format!("{expr:?}");
1893 let (column_metadata, maybe_filter) = match expected_meta {
1894 Some(meta) => {
1895 let column = meta.column_by_name(filter.column_name())?;
1897 match sst_meta.column_by_id(column.column_id) {
1900 Some(sst_column) => {
1901 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1902 let maybe_filter = if sst_column.column_schema.data_type
1907 == column.column_schema.data_type
1908 {
1909 MaybeFilter::Filter(filter)
1910 } else {
1911 debug_assert_eq!(column.semantic_type, SemanticType::Field);
1914 return None;
1915 };
1916 (column, maybe_filter)
1917 }
1918 None => {
1919 if pruned_by_default(&filter, column)? {
1923 (column, MaybeFilter::Pruned)
1924 } else {
1925 (column, MaybeFilter::Matched)
1926 }
1927 }
1928 }
1929 }
1930 None => {
1931 let column = sst_meta.column_by_name(filter.column_name())?;
1932 (column, MaybeFilter::Filter(filter))
1933 }
1934 };
1935
1936 Some(Self {
1937 filter: maybe_filter,
1938 expr_str,
1939 column_id: column_metadata.column_id,
1940 semantic_type: column_metadata.semantic_type,
1941 })
1942 }
1943
1944 pub(crate) fn filter(&self) -> &MaybeFilter {
1946 &self.filter
1947 }
1948
1949 pub(crate) fn expr_str(&self) -> &str {
1951 &self.expr_str
1952 }
1953
1954 pub(crate) fn column_id(&self) -> ColumnId {
1956 self.column_id
1957 }
1958
1959 pub(crate) fn semantic_type(&self) -> SemanticType {
1961 self.semantic_type
1962 }
1963}
1964
1965#[derive(Clone)]
1967pub(crate) struct PhysicalFilterContext {
1968 filter: Arc<dyn PhysicalExpr>,
1970 expr_str: String,
1972 column_id: ColumnId,
1974 column_name: String,
1976 semantic_type: SemanticType,
1978 schema: SchemaRef,
1980 immutable: bool,
1982}
1983
1984impl PhysicalFilterContext {
1985 pub(crate) fn new_opt(
1990 sst_meta: &RegionMetadataRef,
1991 expected_meta: Option<&RegionMetadata>,
1992 read_format: &FlatReadFormat,
1993 expr: &Expr,
1994 ) -> Option<Self> {
1995 if !Self::is_prefilter_candidate(expr) {
1996 return None;
1997 }
1998 let expr_str = format!("{expr:?}");
1999 let column_name = Self::single_column_name(expr)?;
2000 let column_metadata = match expected_meta {
2001 Some(meta) => {
2002 let column = meta.column_by_name(&column_name)?;
2003 let sst_column = sst_meta.column_by_id(column.column_id)?;
2004 if sst_column.column_schema.name != column_name {
2006 return None;
2007 }
2008 column
2009 }
2010 None => sst_meta.column_by_name(&column_name)?,
2011 };
2012
2013 let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
2016 let field = field.clone();
2017 let schema = Arc::new(ArrowSchema::new(vec![field]));
2018 let physical_expr = Predicate::to_physical_expr(expr, &schema)
2019 .inspect_err(|e| {
2020 error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
2021 })
2022 .ok()?;
2023 let immutable = expr_is_immutable(expr);
2024
2025 Some(Self {
2026 filter: physical_expr,
2027 expr_str,
2028 column_id: column_metadata.column_id,
2029 column_name,
2030 semantic_type: column_metadata.semantic_type,
2031 schema,
2032 immutable,
2033 })
2034 }
2035
2036 fn is_prefilter_candidate(expr: &Expr) -> bool {
2041 matches!(
2042 expr,
2043 Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
2044 )
2045 }
2046
2047 fn single_column_name(expr: &Expr) -> Option<String> {
2048 let mut columns = HashSet::new();
2049 if expr_to_columns(expr, &mut columns).is_err() {
2050 return None;
2051 }
2052 if columns.len() != 1 {
2053 return None;
2054 }
2055 columns.iter().next().map(|column| column.name.clone())
2056 }
2057
2058 pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
2060 &self.filter
2061 }
2062
2063 pub(crate) fn expr_str(&self) -> &str {
2065 &self.expr_str
2066 }
2067
2068 pub(crate) fn column_id(&self) -> ColumnId {
2070 self.column_id
2071 }
2072
2073 pub(crate) fn column_name(&self) -> &str {
2075 &self.column_name
2076 }
2077
2078 pub(crate) fn semantic_type(&self) -> SemanticType {
2080 self.semantic_type
2081 }
2082
2083 pub(crate) fn schema(&self) -> &SchemaRef {
2085 &self.schema
2086 }
2087
2088 pub(crate) fn is_immutable(&self) -> bool {
2090 self.immutable
2091 }
2092}
2093
2094fn expr_is_immutable(expr: &Expr) -> bool {
2095 let mut is_immutable = true;
2096 let _ = expr.apply(|expr| match expr {
2097 Expr::ScalarFunction(function)
2098 if function.func.signature().volatility != Volatility::Immutable =>
2099 {
2100 is_immutable = false;
2101 Ok(TreeNodeRecursion::Stop)
2102 }
2103 Expr::ScalarVariable(_, _) => {
2104 is_immutable = false;
2105 Ok(TreeNodeRecursion::Stop)
2106 }
2107 _ => Ok(TreeNodeRecursion::Continue),
2108 });
2109 is_immutable
2110}
2111
2112fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
2115 let value = column.column_schema.create_default().ok().flatten()?;
2116 let scalar_value = value
2117 .try_to_scalar_value(&column.column_schema.data_type)
2118 .ok()?;
2119 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
2120 Some(!matches)
2121}
2122
2123pub struct ParquetReader {
2125 context: FileRangeContextRef,
2127 selection: RowGroupSelection,
2129 reader: Option<FlatPruneReader>,
2131 fetch_metrics: ParquetFetchMetrics,
2133}
2134
2135impl ParquetReader {
2136 #[tracing::instrument(
2137 skip_all,
2138 fields(
2139 region_id = %self.context.reader_builder().file_handle.region_id(),
2140 file_id = %self.context.reader_builder().file_handle.file_id()
2141 )
2142 )]
2143 pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2144 loop {
2145 if let Some(reader) = &mut self.reader {
2146 if let Some(batch) = reader.next_batch().await? {
2147 return Ok(Some(batch));
2148 }
2149 self.reader = None;
2150 continue;
2151 }
2152
2153 let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
2154 return Ok(None);
2155 };
2156
2157 let skip_fields = self.context.pre_filter_mode().skip_fields();
2158 let parquet_reader = self
2159 .context
2160 .reader_builder()
2161 .build(self.context.build_context(
2162 row_group_idx,
2163 Some(row_selection),
2164 Some(&self.fetch_metrics),
2165 ))
2166 .await?;
2167 self.reader = Some(FlatPruneReader::new_with_row_group_reader(
2168 self.context.clone(),
2169 FlatRowGroupReader::new(self.context.clone(), parquet_reader),
2170 skip_fields,
2171 ));
2172 }
2173 }
2174 #[tracing::instrument(
2176 skip_all,
2177 fields(
2178 region_id = %context.reader_builder().file_handle.region_id(),
2179 file_id = %context.reader_builder().file_handle.file_id()
2180 )
2181 )]
2182 pub(crate) async fn new(
2183 context: FileRangeContextRef,
2184 mut selection: RowGroupSelection,
2185 ) -> Result<Self> {
2186 let fetch_metrics = ParquetFetchMetrics::default();
2187 let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
2188 let skip_fields = context.pre_filter_mode().skip_fields();
2189 let parquet_reader = context
2190 .reader_builder()
2191 .build(context.build_context(
2192 row_group_idx,
2193 Some(row_selection),
2194 Some(&fetch_metrics),
2195 ))
2196 .await?;
2197 Some(FlatPruneReader::new_with_row_group_reader(
2198 context.clone(),
2199 FlatRowGroupReader::new(context.clone(), parquet_reader),
2200 skip_fields,
2201 ))
2202 } else {
2203 None
2204 };
2205
2206 Ok(ParquetReader {
2207 context,
2208 selection,
2209 reader,
2210 fetch_metrics,
2211 })
2212 }
2213
2214 pub fn metadata(&self) -> &RegionMetadataRef {
2216 self.context.read_format().metadata()
2217 }
2218
2219 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2220 self.context.reader_builder().parquet_meta.clone()
2221 }
2222}
2223
2224pub(crate) struct FlatRowGroupReader {
2226 context: FileRangeContextRef,
2228 stream: ProjectedRecordBatchStream,
2230 override_sequence: Option<ArrayRef>,
2232}
2233
2234impl FlatRowGroupReader {
2235 pub(crate) fn new(context: FileRangeContextRef, stream: ProjectedRecordBatchStream) -> Self {
2237 let override_sequence = context
2239 .read_format()
2240 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2241
2242 Self {
2243 context,
2244 stream,
2245 override_sequence,
2246 }
2247 }
2248
2249 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2251 match self.stream.next().await {
2252 Some(batch_result) => {
2253 let record_batch = batch_result?;
2254
2255 let record_batch = self
2256 .context
2257 .read_format()
2258 .convert_batch(record_batch, self.override_sequence.as_ref())?;
2259 Ok(Some(record_batch))
2260 }
2261 None => Ok(None),
2262 }
2263 }
2264}
2265
2266#[cfg(test)]
2267mod tests {
2268 use std::any::Any;
2269 use std::fmt::{Debug, Formatter};
2270 use std::sync::{Arc, LazyLock};
2271
2272 use datafusion::arrow::datatypes::DataType;
2273 use datafusion_common::ScalarValue;
2274 use datafusion_expr::expr::ScalarFunction;
2275 use datafusion_expr::{
2276 ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2277 col, lit,
2278 };
2279 use datatypes::arrow::array::{ArrayRef, Int64Array};
2280 use datatypes::arrow::record_batch::RecordBatch;
2281 use datatypes::prelude::ConcreteDataType;
2282 use datatypes::schema::ColumnSchema;
2283 use object_store::services::Memory;
2284 use parquet::arrow::ArrowWriter;
2285 use parquet::file::properties::WriterProperties;
2286 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2287 use store_api::region_request::PathType;
2288 use store_api::storage::RegionId;
2289 use table::predicate::Predicate;
2290
2291 use super::*;
2292 use crate::sst::parquet::metadata::MetadataLoader;
2293 use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2294
2295 #[tokio::test(flavor = "current_thread")]
2296 async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2297 #[derive(Eq, PartialEq, Hash)]
2298 struct PanicDebugUdf;
2299
2300 impl Debug for PanicDebugUdf {
2301 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2302 panic!("minmax predicate key should not format exprs when cache is disabled");
2303 }
2304 }
2305
2306 impl ScalarUDFImpl for PanicDebugUdf {
2307 fn as_any(&self) -> &dyn Any {
2308 self
2309 }
2310
2311 fn name(&self) -> &str {
2312 "panic_debug_udf"
2313 }
2314
2315 fn signature(&self) -> &Signature {
2316 static SIGNATURE: LazyLock<Signature> =
2317 LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2318 &SIGNATURE
2319 }
2320
2321 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2322 Ok(DataType::Int64)
2323 }
2324
2325 fn invoke_with_args(
2326 &self,
2327 _args: ScalarFunctionArgs,
2328 ) -> datafusion_common::Result<ColumnarValue> {
2329 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2330 }
2331 }
2332
2333 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2334 let file_handle = sst_file_handle(0, 1);
2335 let table_dir = "test_table".to_string();
2336 let path_type = PathType::Bare;
2337 let file_path = file_handle.file_path(&table_dir, path_type);
2338
2339 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2340 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2341 let mut parquet_bytes = Vec::new();
2342 let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2343 writer.write(&batch).unwrap();
2344 writer.close().unwrap();
2345 let file_size = parquet_bytes.len() as u64;
2346 object_store.write(&file_path, parquet_bytes).await.unwrap();
2347
2348 let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2349 let read_format = FlatReadFormat::new(
2350 region_metadata.clone(),
2351 ReadColumns::from_deduped_column_ids(
2352 region_metadata
2353 .column_metadatas
2354 .iter()
2355 .map(|column| column.column_id),
2356 ),
2357 None,
2358 &file_path,
2359 false,
2360 )
2361 .unwrap();
2362
2363 let mut cache_metrics = MetadataCacheMetrics::default();
2364 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2365 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2366
2367 let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2368 let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2369 udf,
2370 vec![],
2371 ))]);
2372 let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2373 .predicate(Some(predicate))
2374 .cache(CacheStrategy::Disabled);
2375
2376 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2377 let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2378 let mut metrics = ReaderFilterMetrics::default();
2379 let selection = builder.row_groups_by_minmax(
2380 &read_format,
2381 &parquet_meta,
2382 row_group_size,
2383 total_row_count,
2384 &mut metrics,
2385 false,
2386 );
2387
2388 assert!(!selection.is_empty());
2389 }
2390
2391 #[test]
2392 fn test_expr_is_immutable_checks_scalar_function_volatility() {
2393 #[derive(Debug, PartialEq, Eq, Hash)]
2394 struct TestVolatilityUdf {
2395 name: String,
2396 signature: Signature,
2397 }
2398
2399 impl TestVolatilityUdf {
2400 fn new(name: &str, volatility: Volatility) -> Self {
2401 Self {
2402 name: name.to_string(),
2403 signature: Signature::variadic_any(volatility),
2404 }
2405 }
2406 }
2407
2408 impl ScalarUDFImpl for TestVolatilityUdf {
2409 fn as_any(&self) -> &dyn Any {
2410 self
2411 }
2412
2413 fn name(&self) -> &str {
2414 &self.name
2415 }
2416
2417 fn signature(&self) -> &Signature {
2418 &self.signature
2419 }
2420
2421 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2422 Ok(DataType::Int64)
2423 }
2424
2425 fn invoke_with_args(
2426 &self,
2427 _args: ScalarFunctionArgs,
2428 ) -> datafusion_common::Result<ColumnarValue> {
2429 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2430 }
2431 }
2432
2433 let expr = |name: &str, volatility| {
2434 Expr::ScalarFunction(ScalarFunction::new_udf(
2435 Arc::new(ScalarUDF::new_from_impl(TestVolatilityUdf::new(
2436 name, volatility,
2437 ))),
2438 vec![],
2439 ))
2440 };
2441
2442 assert!(expr_is_immutable(&expr(
2443 "immutable_udf",
2444 Volatility::Immutable
2445 )));
2446 assert!(!expr_is_immutable(&expr("stable_udf", Volatility::Stable)));
2447 assert!(!expr_is_immutable(&expr(
2448 "volatile_udf",
2449 Volatility::Volatile
2450 )));
2451
2452 let scalar_variable = Expr::ScalarVariable(
2453 Arc::new(Field::new("@@version", DataType::Utf8, false)),
2454 vec!["@@version".to_string()],
2455 );
2456 assert!(!expr_is_immutable(&scalar_variable));
2457 }
2458
2459 #[tokio::test(flavor = "current_thread")]
2460 async fn test_has_row_level_selection() {
2461 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2462 let file_path = "row_level_selection.parquet";
2463
2464 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef;
2465 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2466 let props = WriterProperties::builder()
2467 .set_max_row_group_row_count(Some(3))
2468 .build();
2469 let mut parquet_bytes = Vec::new();
2470 let mut writer =
2471 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
2472 writer.write(&batch).unwrap();
2473 writer.close().unwrap();
2474 let file_size = parquet_bytes.len() as u64;
2475 object_store.write(file_path, parquet_bytes).await.unwrap();
2476
2477 let mut cache_metrics = MetadataCacheMetrics::default();
2478 let loader = MetadataLoader::new(object_store, file_path, file_size);
2479 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2480 assert_eq!(2, parquet_meta.num_row_groups());
2481
2482 let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5);
2483 assert!(!has_row_level_selection(&full_row_groups, &parquet_meta));
2484
2485 let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3);
2486 assert!(has_row_level_selection(&prefix_selection, &parquet_meta));
2487
2488 let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3);
2489 assert!(has_row_level_selection(&interior_selection, &parquet_meta));
2490 }
2491
2492 fn expected_metadata_with_reused_tag_name(
2493 old_metadata: &RegionMetadata,
2494 ) -> Arc<RegionMetadata> {
2495 let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2496 builder
2497 .push_column_metadata(ColumnMetadata {
2498 column_schema: ColumnSchema::new(
2499 "tag_0".to_string(),
2500 ConcreteDataType::string_datatype(),
2501 true,
2502 ),
2503 semantic_type: SemanticType::Tag,
2504 column_id: 10,
2505 })
2506 .push_column_metadata(ColumnMetadata {
2507 column_schema: ColumnSchema::new(
2508 "tag_1".to_string(),
2509 ConcreteDataType::string_datatype(),
2510 true,
2511 ),
2512 semantic_type: SemanticType::Tag,
2513 column_id: 1,
2514 })
2515 .push_column_metadata(ColumnMetadata {
2516 column_schema: ColumnSchema::new(
2517 "field_0".to_string(),
2518 ConcreteDataType::uint64_datatype(),
2519 true,
2520 ),
2521 semantic_type: SemanticType::Field,
2522 column_id: 2,
2523 })
2524 .push_column_metadata(ColumnMetadata {
2525 column_schema: ColumnSchema::new(
2526 "ts".to_string(),
2527 ConcreteDataType::timestamp_millisecond_datatype(),
2528 false,
2529 ),
2530 semantic_type: SemanticType::Timestamp,
2531 column_id: 3,
2532 })
2533 .primary_key(vec![10, 1]);
2534
2535 Arc::new(builder.build().unwrap())
2536 }
2537
2538 #[test]
2539 fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
2540 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2541 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2542 let ctx = SimpleFilterContext::new_opt(
2543 &metadata,
2544 Some(expected_metadata.as_ref()),
2545 &col("tag_0").eq(lit("a")),
2546 )
2547 .unwrap();
2548 assert!(matches!(
2549 ctx.filter(),
2550 MaybeFilter::Matched | MaybeFilter::Pruned
2551 ));
2552 }
2553
2554 #[test]
2555 fn test_simple_filter_context_drops_mismatched_field_filter() {
2556 let (sst_metadata, latest_metadata) = mock_metadata();
2557 let ctx = SimpleFilterContext::new_opt(
2558 &sst_metadata,
2559 Some(latest_metadata.as_ref()),
2560 &col("field_0").eq(lit(1_i64)),
2561 );
2562
2563 assert!(ctx.is_none());
2564 }
2565
2566 fn mock_metadata() -> (RegionMetadataRef, RegionMetadataRef) {
2567 let region_id = RegionId::new(1, 1);
2568 let make_tag_0 = || ColumnMetadata {
2569 column_schema: ColumnSchema::new(
2570 "tag_0".to_string(),
2571 ConcreteDataType::string_datatype(),
2572 true,
2573 ),
2574 semantic_type: SemanticType::Tag,
2575 column_id: 0,
2576 };
2577 let make_ts = || ColumnMetadata {
2578 column_schema: ColumnSchema::new(
2579 "ts".to_string(),
2580 ConcreteDataType::timestamp_millisecond_datatype(),
2581 false,
2582 ),
2583 semantic_type: SemanticType::Timestamp,
2584 column_id: 2,
2585 };
2586 let make_field_0 = |data_type| ColumnMetadata {
2587 column_schema: ColumnSchema::new("field_0".to_string(), data_type, true),
2588 semantic_type: SemanticType::Field,
2589 column_id: 1,
2590 };
2591
2592 let mut sst_builder = RegionMetadataBuilder::new(region_id);
2593 sst_builder
2594 .push_column_metadata(make_tag_0())
2595 .push_column_metadata(make_field_0(ConcreteDataType::uint64_datatype()))
2596 .push_column_metadata(make_ts())
2597 .primary_key(vec![0]);
2598 let sst_metadata = Arc::new(sst_builder.build().unwrap());
2599
2600 let mut expected_builder = RegionMetadataBuilder::new(region_id);
2601 expected_builder
2602 .push_column_metadata(make_tag_0())
2603 .push_column_metadata(make_field_0(ConcreteDataType::int64_datatype()))
2604 .push_column_metadata(make_ts())
2605 .primary_key(vec![0]);
2606
2607 let expected_metadata = Arc::new(expected_builder.build().unwrap());
2608
2609 (sst_metadata, expected_metadata)
2610 }
2611
2612 #[test]
2613 fn test_physical_filter_context_skips_renamed_column() {
2614 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2615 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2616 let read_format = FlatReadFormat::new(
2617 metadata.clone(),
2618 ReadColumns::from_deduped_column_ids(
2619 metadata.column_metadatas.iter().map(|c| c.column_id),
2620 ),
2621 None,
2622 "test",
2623 true,
2624 )
2625 .unwrap();
2626
2627 let ctx = PhysicalFilterContext::new_opt(
2628 &metadata,
2629 Some(expected_metadata.as_ref()),
2630 &read_format,
2631 &col("tag_0").in_list(vec![lit("a"), lit("b")], false),
2632 );
2633
2634 assert!(ctx.is_none());
2635 }
2636
2637 #[test]
2638 fn test_physical_filter_context_only_accepts_prefilter_candidates() {
2639 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2640 let read_format = FlatReadFormat::new(
2641 metadata.clone(),
2642 ReadColumns::from_deduped_column_ids(
2643 metadata.column_metadatas.iter().map(|c| c.column_id),
2644 ),
2645 None,
2646 "test",
2647 true,
2648 )
2649 .unwrap();
2650
2651 let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
2653 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
2654
2655 let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
2657 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, ¬_in).is_some());
2658
2659 let is_null = col("tag_0").is_null();
2661 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
2662 let is_not_null = col("tag_0").is_not_null();
2663 assert!(
2664 PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
2665 );
2666
2667 let between = col("field_0").between(lit(1_u64), lit(10_u64));
2669 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
2670
2671 let binary = col("tag_0").eq(lit("a"));
2673 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
2674 }
2675}