1mod stream;
18
19#[cfg(feature = "vector_index")]
20use std::collections::BTreeSet;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{error, tracing, warn};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::{Expr, Volatility};
32use datatypes::arrow::array::ArrayRef;
33use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::data_type::ConcreteDataType;
36use datatypes::extension::json::is_structured_json_field;
37use datatypes::prelude::DataType;
38use futures::StreamExt;
39use mito_codec::row_converter::build_primary_key_codec;
40use object_store::ObjectStore;
41use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
42use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema};
43use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
44use parquet::file::properties::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT;
45use partition::expr::PartitionExpr;
46use snafu::ResultExt;
47use store_api::codec::PrimaryKeyEncoding;
48use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
49use store_api::region_request::PathType;
50use store_api::storage::{ColumnId, FileId};
51use table::predicate::Predicate;
52
53use self::stream::{NestedSchemaAligner, ProjectedRecordBatchStream};
54use crate::cache::index::result_cache::PredicateKey;
55use crate::cache::{CacheStrategy, CachedSstMeta};
56#[cfg(feature = "vector_index")]
57use crate::error::ApplyVectorIndexSnafu;
58use crate::error::{
59 ParquetToArrowSchemaSnafu, ReadDataPartSnafu, Result, SerializePartitionExprSnafu,
60};
61use crate::metrics::{
62 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
63 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
64};
65use crate::read::flat_projection::CompactionProjectionMapper;
66use crate::read::prune::FlatPruneReader;
67use crate::read::read_columns::ReadColumns;
68use crate::sst::file::FileHandle;
69use crate::sst::index::bloom_filter::applier::{
70 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
71};
72use crate::sst::index::fulltext_index::applier::{
73 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
74};
75use crate::sst::index::inverted_index::applier::{
76 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
77};
78#[cfg(feature = "vector_index")]
79use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
80use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
81use crate::sst::parquet::file_range::{
82 FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
83};
84use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
85use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, need_override_sequence};
86use crate::sst::parquet::metadata::MetadataLoader;
87use crate::sst::parquet::prefilter::{
88 PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
89};
90use crate::sst::parquet::push_decoder::{
91 SstParquetRangeFetcher, build_sst_parquet_record_batch_stream,
92};
93use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
94use crate::sst::parquet::row_group::ParquetFetchMetrics;
95use crate::sst::parquet::row_selection::RowGroupSelection;
96use crate::sst::parquet::stats::RowGroupPruningStats;
97use crate::sst::{override_pk_field_to_binary, tag_maybe_to_dictionary_field};
98
99const INDEX_TYPE_FULLTEXT: &str = "fulltext";
100
101const MAX_ROW_GROUPS_TO_CHECK_PK: usize = 4;
103
104fn should_read_pk_as_binary(parquet_meta: &ParquetMetaData) -> bool {
108 should_read_pk_as_binary_with_limit(parquet_meta, DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT)
109}
110
111fn should_read_pk_as_binary_with_limit(
112 parquet_meta: &ParquetMetaData,
113 dict_page_size_limit: usize,
114) -> bool {
115 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
116 if num_columns < INTERNAL_COLUMN_NUM {
117 return false;
118 }
119 let pk_idx = primary_key_column_index(num_columns);
120 parquet_meta
121 .row_groups()
122 .iter()
123 .take(MAX_ROW_GROUPS_TO_CHECK_PK)
124 .any(|rg| rg.column(pk_idx).uncompressed_size() as usize > dict_page_size_limit)
125}
126const INDEX_TYPE_INVERTED: &str = "inverted";
127const INDEX_TYPE_BLOOM: &str = "bloom filter";
128const INDEX_TYPE_VECTOR: &str = "vector";
129
130macro_rules! handle_index_error {
131 ($err:expr, $file_handle:expr, $index_type:expr) => {
132 if cfg!(any(test, feature = "test")) {
133 panic!(
134 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
135 $index_type,
136 $file_handle.region_id(),
137 $file_handle.file_id(),
138 $err
139 );
140 } else {
141 warn!(
142 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
143 $index_type,
144 $file_handle.region_id(),
145 $file_handle.file_id()
146 );
147 }
148 };
149}
150
151pub struct ParquetReaderBuilder {
153 table_dir: String,
155 path_type: PathType,
157 file_handle: FileHandle,
158 object_store: ObjectStore,
159 predicate: Option<Predicate>,
161 read_cols: Option<ReadColumns>,
166 cache_strategy: CacheStrategy,
168 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
170 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
171 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
172 #[cfg(feature = "vector_index")]
174 vector_index_applier: Option<VectorIndexApplierRef>,
175 #[cfg(feature = "vector_index")]
177 vector_index_k: Option<usize>,
178 expected_metadata: Option<RegionMetadataRef>,
182 compaction: bool,
184 pre_filter_mode: PreFilterMode,
186 decode_primary_key_values: bool,
188 page_index_policy: PageIndexPolicy,
189 defer_optional_page_index: bool,
190}
191
192impl ParquetReaderBuilder {
193 pub fn new(
195 table_dir: String,
196 path_type: PathType,
197 file_handle: FileHandle,
198 object_store: ObjectStore,
199 ) -> ParquetReaderBuilder {
200 ParquetReaderBuilder {
201 table_dir,
202 path_type,
203 file_handle,
204 object_store,
205 predicate: None,
206 read_cols: None,
207 cache_strategy: CacheStrategy::Disabled,
208 inverted_index_appliers: [None, None],
209 bloom_filter_index_appliers: [None, None],
210 fulltext_index_appliers: [None, None],
211 #[cfg(feature = "vector_index")]
212 vector_index_applier: None,
213 #[cfg(feature = "vector_index")]
214 vector_index_k: None,
215 expected_metadata: None,
216 compaction: false,
217 pre_filter_mode: PreFilterMode::All,
218 decode_primary_key_values: false,
219 page_index_policy: Default::default(),
220 defer_optional_page_index: false,
221 }
222 }
223
224 #[must_use]
226 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
227 self.predicate = predicate;
228 self
229 }
230
231 #[must_use]
235 pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
236 self.read_cols = read_cols;
237 self
238 }
239
240 #[must_use]
242 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
243 self.cache_strategy = cache;
244 self
245 }
246
247 #[must_use]
249 pub(crate) fn inverted_index_appliers(
250 mut self,
251 index_appliers: [Option<InvertedIndexApplierRef>; 2],
252 ) -> Self {
253 self.inverted_index_appliers = index_appliers;
254 self
255 }
256
257 #[must_use]
259 pub(crate) fn bloom_filter_index_appliers(
260 mut self,
261 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
262 ) -> Self {
263 self.bloom_filter_index_appliers = index_appliers;
264 self
265 }
266
267 #[must_use]
269 pub(crate) fn fulltext_index_appliers(
270 mut self,
271 index_appliers: [Option<FulltextIndexApplierRef>; 2],
272 ) -> Self {
273 self.fulltext_index_appliers = index_appliers;
274 self
275 }
276
277 #[cfg(feature = "vector_index")]
279 #[must_use]
280 pub(crate) fn vector_index_applier(
281 mut self,
282 applier: Option<VectorIndexApplierRef>,
283 k: Option<usize>,
284 ) -> Self {
285 self.vector_index_applier = applier;
286 self.vector_index_k = k;
287 self
288 }
289
290 #[must_use]
292 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
293 self.expected_metadata = expected_metadata;
294 self
295 }
296
297 #[must_use]
299 pub fn compaction(mut self, compaction: bool) -> Self {
300 self.compaction = compaction;
301 self
302 }
303
304 #[must_use]
306 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
307 self.pre_filter_mode = pre_filter_mode;
308 self
309 }
310
311 #[must_use]
313 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
314 self.decode_primary_key_values = decode;
315 self
316 }
317
318 #[must_use]
319 pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
320 self.page_index_policy = page_index_policy;
321 self
322 }
323
324 #[must_use]
326 pub(crate) fn deferred_optional_page_index(mut self) -> Self {
327 self.page_index_policy = PageIndexPolicy::Optional;
328 self.defer_optional_page_index = true;
329 self
330 }
331
332 #[tracing::instrument(
336 skip_all,
337 fields(
338 region_id = %self.file_handle.region_id(),
339 file_id = %self.file_handle.file_id()
340 )
341 )]
342 pub async fn build(&self) -> Result<Option<ParquetReader>> {
343 let mut metrics = ReaderMetrics::default();
344
345 let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
346 return Ok(None);
347 };
348 ParquetReader::new(Arc::new(context), selection)
349 .await
350 .map(Some)
351 }
352
353 #[tracing::instrument(
357 skip_all,
358 fields(
359 region_id = %self.file_handle.region_id(),
360 file_id = %self.file_handle.file_id()
361 )
362 )]
363 pub async fn build_reader_input(
364 &self,
365 metrics: &mut ReaderMetrics,
366 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
367 self.build_reader_input_inner(metrics).await
368 }
369
370 async fn build_reader_input_inner(
371 &self,
372 metrics: &mut ReaderMetrics,
373 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
374 let start = Instant::now();
375
376 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
377 let file_size = self.file_handle.meta_ref().file_size;
378
379 let initial_page_index_policy = if self.defer_optional_page_index
381 && self.page_index_policy == PageIndexPolicy::Optional
382 {
383 PageIndexPolicy::Skip
384 } else {
385 self.page_index_policy
386 };
387 let (sst_meta, mut cache_miss) = self
388 .read_parquet_metadata(
389 &file_path,
390 file_size,
391 &mut metrics.metadata_cache_metrics,
392 initial_page_index_policy,
393 )
394 .await?;
395 let mut parquet_meta = sst_meta.parquet_metadata();
396 let region_meta = sst_meta.region_metadata();
397 let region_partition_expr_str = self
398 .expected_metadata
399 .as_ref()
400 .and_then(|meta| meta.partition_expr.as_ref())
401 .map(|expr| expr.as_str());
402 let (_, is_same_region_partition) = Self::is_same_region_partition(
403 region_partition_expr_str,
404 self.file_handle.meta_ref().partition_expr.as_ref(),
405 )?;
406 let skip_auto_convert = self.compaction && is_same_region_partition;
410
411 let compaction_projection_mapper = if self.compaction
420 && !is_same_region_partition
421 && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
422 {
423 Some(CompactionProjectionMapper::try_new(®ion_meta)?)
424 } else {
425 None
426 };
427
428 let read_cols = if let Some(read_cols) = &self.read_cols {
429 read_cols.clone()
430 } else {
431 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
432 ReadColumns::from_deduped_column_ids(
434 expected_meta
435 .column_metadatas
436 .iter()
437 .map(|col| col.column_id),
438 )
439 };
440
441 let file_metadata = parquet_meta.file_metadata();
442 let parquet_schema_desc = file_metadata.schema_descr();
443 let file_schema =
444 parquet_to_arrow_schema(parquet_schema_desc, file_metadata.key_value_metadata())
445 .context(ParquetToArrowSchemaSnafu { file: &file_path })?;
446 let mut read_format = FlatReadFormat::new(
447 region_meta.clone(),
448 read_cols,
449 Some(Arc::new(file_schema)),
450 &file_path,
451 skip_auto_convert,
452 )?;
453 if need_override_sequence(&parquet_meta) {
454 read_format
455 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
456 }
457
458 let parquet_read_cols = read_format.parquet_read_columns();
460 let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
461 let has_nested_projection = parquet_read_cols.has_nested();
462 let selection = self
463 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
464 .await;
465
466 if selection.is_empty() {
467 metrics.build_cost += start.elapsed();
468 return Ok(None);
469 }
470
471 let prune_schema = self
472 .expected_metadata
473 .as_ref()
474 .map(|meta| meta.schema.clone())
475 .unwrap_or_else(|| region_meta.schema.clone());
476
477 let dyn_filters = if let Some(predicate) = &self.predicate {
478 predicate.dyn_filters().as_ref().clone()
479 } else {
480 vec![]
481 };
482
483 let codec = build_primary_key_codec(read_format.metadata());
484
485 let filter_plan = build_reader_filter_plan(
486 self.predicate.as_ref(),
487 self.expected_metadata.as_deref(),
488 self.pre_filter_mode,
489 &read_format,
490 &codec,
491 );
492
493 if self.defer_optional_page_index
494 && self.page_index_policy == PageIndexPolicy::Optional
495 && (filter_plan.prefilter_builder.is_some()
496 || has_row_level_selection(&selection, &parquet_meta))
497 {
498 let (sst_meta, page_index_cache_miss) = self
499 .read_parquet_metadata(
500 &file_path,
501 file_size,
502 &mut metrics.metadata_cache_metrics,
503 PageIndexPolicy::Optional,
504 )
505 .await?;
506 parquet_meta = sst_meta.parquet_metadata();
507 cache_miss |= page_index_cache_miss;
508 }
509
510 if cache_miss && !selection.is_empty() {
512 use crate::cache::file_cache::{FileType, IndexKey};
513 let index_key = IndexKey::new(
514 self.file_handle.region_id(),
515 self.file_handle.file_id().file_id(),
516 FileType::Parquet,
517 );
518 self.cache_strategy.maybe_download_background(
519 index_key,
520 file_path.clone(),
521 self.object_store.clone(),
522 file_size,
523 );
524 }
525
526 let mut arrow_reader_options = ArrowReaderOptions::new();
528 if !read_format
529 .arrow_schema()
530 .fields()
531 .iter()
532 .any(is_structured_json_field)
533 {
534 let schema_for_reader = if should_read_pk_as_binary(&parquet_meta) {
537 read_format.set_pk_as_binary()?;
538 override_pk_field_to_binary(read_format.arrow_schema())
539 } else {
540 read_format.arrow_schema().clone()
541 };
542 arrow_reader_options = arrow_reader_options.with_schema(schema_for_reader);
543 }
544 let arrow_metadata =
545 ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
546 .context(ReadDataPartSnafu)?;
547
548 let output_schema = read_format.output_arrow_schema()?;
549
550 let reader_builder = RowGroupReaderBuilder {
551 file_handle: self.file_handle.clone(),
552 file_path,
553 parquet_meta,
554 arrow_metadata,
555 output_schema,
556 object_store: self.object_store.clone(),
557 projection: projection_plan,
558 has_nested_projection,
559 cache_strategy: self.cache_strategy.clone(),
560 prefilter_builder: filter_plan.prefilter_builder,
561 };
562
563 let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
564
565 let context = FileRangeContext::new(
566 reader_builder,
567 RangeBase {
568 filters: filter_plan.remaining_simple_filters,
569 dyn_filters,
570 read_format,
571 expected_metadata: self.expected_metadata.clone(),
572 prune_schema,
573 codec,
574 compat_batch: None,
575 compaction_projection_mapper,
576 pre_filter_mode: self.pre_filter_mode,
577 partition_filter,
578 },
579 );
580
581 metrics.build_cost += start.elapsed();
582
583 Ok(Some((context, selection)))
584 }
585
586 fn is_same_region_partition(
587 region_partition_expr_str: Option<&str>,
588 file_partition_expr: Option<&PartitionExpr>,
589 ) -> Result<(Option<PartitionExpr>, bool)> {
590 let region_partition_expr = match region_partition_expr_str {
591 Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
592 None => None,
593 };
594
595 let is_same = region_partition_expr.as_ref() == file_partition_expr;
596 Ok((region_partition_expr, is_same))
597 }
598
599 fn build_partition_filter(
602 &self,
603 read_format: &FlatReadFormat,
604 prune_schema: &Arc<datatypes::schema::Schema>,
605 ) -> Result<Option<PartitionFilterContext>> {
606 let region_partition_expr_str = self
607 .expected_metadata
608 .as_ref()
609 .and_then(|meta| meta.partition_expr.as_ref());
610 let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
611
612 let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
613 region_partition_expr_str.map(|s| s.as_str()),
614 file_partition_expr_ref,
615 )?;
616
617 if is_same_region_partition {
618 return Ok(None);
619 }
620
621 let Some(region_partition_expr) = region_partition_expr else {
622 return Ok(None);
623 };
624
625 let mut referenced_columns = HashSet::new();
627 region_partition_expr.collect_column_names(&mut referenced_columns);
628
629 let partition_schema = Arc::new(datatypes::schema::Schema::new(
631 prune_schema
632 .column_schemas()
633 .iter()
634 .filter(|col| referenced_columns.contains(&col.name))
635 .map(|col| {
636 if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
637 && column_meta.semantic_type == SemanticType::Tag
638 && col.data_type.is_string()
639 {
640 let field = Arc::new(Field::new(
641 &col.name,
642 col.data_type.as_arrow_type(),
643 col.is_nullable(),
644 ));
645 let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
646 let mut column = col.clone();
647 column.data_type =
648 ConcreteDataType::from_arrow_type(dict_field.data_type());
649 return column;
650 }
651
652 col.clone()
653 })
654 .collect::<Vec<_>>(),
655 ));
656
657 let region_partition_physical_expr = region_partition_expr
658 .try_as_physical_expr(partition_schema.arrow_schema())
659 .context(SerializePartitionExprSnafu)?;
660
661 Ok(Some(PartitionFilterContext {
662 region_partition_physical_expr,
663 partition_schema,
664 }))
665 }
666
667 pub(crate) async fn read_parquet_metadata(
670 &self,
671 file_path: &str,
672 file_size: u64,
673 cache_metrics: &mut MetadataCacheMetrics,
674 page_index_policy: PageIndexPolicy,
675 ) -> Result<(Arc<CachedSstMeta>, bool)> {
676 let start = Instant::now();
677 let _t = READ_STAGE_ELAPSED
678 .with_label_values(&["read_parquet_metadata"])
679 .start_timer();
680
681 let file_id = self.file_handle.file_id();
682 if let Some(metadata) = self
684 .cache_strategy
685 .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
686 .await
687 {
688 cache_metrics.metadata_load_cost += start.elapsed();
689 return Ok((metadata, false));
690 }
691
692 let mut metadata_loader =
694 MetadataLoader::new(self.object_store.clone(), file_path, file_size);
695 metadata_loader.with_page_index_policy(page_index_policy);
696 let metadata = metadata_loader.load(cache_metrics).await?;
697
698 let metadata = Arc::new(CachedSstMeta::try_new_with_page_index_policy(
699 file_path,
700 metadata,
701 None,
702 page_index_policy,
703 )?);
704 self.cache_strategy
706 .put_sst_meta_data(file_id, metadata.clone());
707
708 cache_metrics.metadata_load_cost += start.elapsed();
709 Ok((metadata, true))
710 }
711
712 #[tracing::instrument(
714 skip_all,
715 fields(
716 region_id = %self.file_handle.region_id(),
717 file_id = %self.file_handle.file_id()
718 )
719 )]
720 async fn row_groups_to_read(
721 &self,
722 read_format: &FlatReadFormat,
723 parquet_meta: &ParquetMetaData,
724 metrics: &mut ReaderFilterMetrics,
725 ) -> RowGroupSelection {
726 let num_row_groups = parquet_meta.num_row_groups();
727 let num_rows = parquet_meta.file_metadata().num_rows();
728 if num_row_groups == 0 || num_rows == 0 {
729 return RowGroupSelection::default();
730 }
731
732 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
735 if row_group_size == 0 {
736 return RowGroupSelection::default();
737 }
738
739 metrics.rg_total += num_row_groups;
740 metrics.rows_total += num_rows as usize;
741
742 let skip_fields = self.pre_filter_mode.skip_fields();
744
745 let mut output = self.row_groups_by_minmax(
746 read_format,
747 parquet_meta,
748 row_group_size,
749 num_rows as usize,
750 metrics,
751 skip_fields,
752 );
753 if output.is_empty() {
754 return output;
755 }
756
757 let fulltext_filtered = self
758 .prune_row_groups_by_fulltext_index(
759 row_group_size,
760 num_row_groups,
761 &mut output,
762 metrics,
763 skip_fields,
764 )
765 .await;
766 if output.is_empty() {
767 return output;
768 }
769
770 self.prune_row_groups_by_inverted_index(
771 read_format.metadata(),
772 row_group_size,
773 num_row_groups,
774 &mut output,
775 metrics,
776 skip_fields,
777 )
778 .await;
779 if output.is_empty() {
780 return output;
781 }
782
783 self.prune_row_groups_by_bloom_filter(
784 read_format.metadata(),
785 row_group_size,
786 parquet_meta,
787 &mut output,
788 metrics,
789 skip_fields,
790 )
791 .await;
792 if output.is_empty() {
793 return output;
794 }
795
796 if !fulltext_filtered {
797 self.prune_row_groups_by_fulltext_bloom(
798 row_group_size,
799 parquet_meta,
800 &mut output,
801 metrics,
802 skip_fields,
803 )
804 .await;
805 }
806 #[cfg(feature = "vector_index")]
807 {
808 self.prune_row_groups_by_vector_index(
809 row_group_size,
810 num_row_groups,
811 &mut output,
812 metrics,
813 )
814 .await;
815 if output.is_empty() {
816 return output;
817 }
818 }
819 output
820 }
821
822 async fn prune_row_groups_by_fulltext_index(
824 &self,
825 row_group_size: usize,
826 num_row_groups: usize,
827 output: &mut RowGroupSelection,
828 metrics: &mut ReaderFilterMetrics,
829 skip_fields: bool,
830 ) -> bool {
831 if !self.file_handle.meta_ref().fulltext_index_available() {
832 return false;
833 }
834
835 let mut pruned = false;
836 let appliers = if skip_fields {
838 &self.fulltext_index_appliers[..1]
839 } else {
840 &self.fulltext_index_appliers[..]
841 };
842 for index_applier in appliers.iter().flatten() {
843 let predicate_key = index_applier.predicate_key();
844 let cached = self
846 .cache_strategy
847 .index_result_cache()
848 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
849 if let Some(result) = cached.as_ref()
850 && all_required_row_groups_searched(output, result)
851 {
852 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
853 metrics.fulltext_index_cache_hit += 1;
854 pruned = true;
855 continue;
856 }
857
858 metrics.fulltext_index_cache_miss += 1;
860 let file_size_hint = self.file_handle.meta_ref().index_file_size();
861 let apply_res = index_applier
862 .apply_fine(
863 self.file_handle.index_id(),
864 Some(file_size_hint),
865 metrics.fulltext_index_apply_metrics.as_mut(),
866 )
867 .await;
868 let selection = match apply_res {
869 Ok(Some(res)) => {
870 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
871 }
872 Ok(None) => continue,
873 Err(err) => {
874 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
875 continue;
876 }
877 };
878
879 self.apply_index_result_and_update_cache(
880 predicate_key,
881 self.file_handle.file_id().file_id(),
882 selection,
883 output,
884 metrics,
885 INDEX_TYPE_FULLTEXT,
886 );
887 pruned = true;
888 }
889 pruned
890 }
891
892 async fn prune_row_groups_by_inverted_index(
898 &self,
899 sst_metadata: &RegionMetadataRef,
900 row_group_size: usize,
901 num_row_groups: usize,
902 output: &mut RowGroupSelection,
903 metrics: &mut ReaderFilterMetrics,
904 skip_fields: bool,
905 ) -> bool {
906 if !self.file_handle.meta_ref().inverted_index_available() {
907 return false;
908 }
909
910 let mut pruned = false;
911 let appliers = if skip_fields {
913 &self.inverted_index_appliers[..1]
914 } else {
915 &self.inverted_index_appliers[..]
916 };
917 for index_applier in appliers.iter().flatten() {
918 let Ok(Some(plan)) = index_applier
919 .plan_for_sst(sst_metadata)
920 .inspect_err(|e| warn!(e; "failed to build compatible plan for sst"))
921 else {
922 continue;
923 };
924
925 let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
927 let file_id = self.file_handle.file_id().file_id();
928 cache.get(&plan.predicate_key, file_id)
929 });
930
931 if let Some(result) = cached.as_ref()
932 && all_required_row_groups_searched(output, result)
933 {
934 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
935 metrics.inverted_index_cache_hit += 1;
936 pruned = true;
937 continue;
938 }
939
940 metrics.inverted_index_cache_miss += 1;
942 let file_size_hint = self.file_handle.meta_ref().index_file_size();
943 let apply_res = index_applier
944 .apply(
945 self.file_handle.index_id(),
946 Some(file_size_hint),
947 &plan.index_applier,
948 metrics.inverted_index_apply_metrics.as_mut(),
949 )
950 .await;
951
952 let selection = match apply_res {
953 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
954 row_group_size,
955 num_row_groups,
956 apply_output,
957 ),
958 Err(err) => {
959 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
960 continue;
961 }
962 };
963
964 self.apply_index_result_and_update_cache(
965 &plan.predicate_key,
966 self.file_handle.file_id().file_id(),
967 selection,
968 output,
969 metrics,
970 INDEX_TYPE_INVERTED,
971 );
972 pruned = true;
973 }
974 pruned
975 }
976
977 async fn prune_row_groups_by_bloom_filter(
978 &self,
979 sst_metadata: &RegionMetadataRef,
980 row_group_size: usize,
981 parquet_meta: &ParquetMetaData,
982 output: &mut RowGroupSelection,
983 metrics: &mut ReaderFilterMetrics,
984 skip_fields: bool,
985 ) -> bool {
986 if !self.file_handle.meta_ref().bloom_filter_index_available() {
987 return false;
988 }
989
990 let mut pruned = false;
991 let appliers = if skip_fields {
993 &self.bloom_filter_index_appliers[..1]
994 } else {
995 &self.bloom_filter_index_appliers[..]
996 };
997 for index_applier in appliers.iter().flatten() {
998 let Some(compatible_predicates) =
999 index_applier.compatible_predicate_for_sst(sst_metadata)
1000 else {
1001 continue;
1002 };
1003 let predicate_key = PredicateKey::new_bloom(compatible_predicates.clone());
1004 let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
1006 let file_id = self.file_handle.file_id().file_id();
1007 cache.get(&predicate_key, file_id)
1008 });
1009 if let Some(result) = cached.as_ref()
1010 && all_required_row_groups_searched(output, result)
1011 {
1012 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
1013 metrics.bloom_filter_cache_hit += 1;
1014 pruned = true;
1015 continue;
1016 }
1017
1018 metrics.bloom_filter_cache_miss += 1;
1020 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1021 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1022 (
1023 rg.num_rows() as usize,
1024 output.contains_non_empty_row_group(i)
1026 && cached
1027 .as_ref()
1028 .map(|c| !c.contains_row_group(i))
1029 .unwrap_or(true),
1030 )
1031 });
1032 let apply_res = index_applier
1033 .apply(
1034 self.file_handle.index_id(),
1035 Some(file_size_hint),
1036 &compatible_predicates,
1037 rgs,
1038 metrics.bloom_filter_apply_metrics.as_mut(),
1039 )
1040 .await;
1041 let mut selection = match apply_res {
1042 Ok(apply_output) => {
1043 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1044 }
1045 Err(err) => {
1046 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
1047 continue;
1048 }
1049 };
1050
1051 if let Some(cached) = cached.as_ref() {
1053 selection.concat(cached);
1054 }
1055
1056 self.apply_index_result_and_update_cache(
1057 &predicate_key,
1058 self.file_handle.file_id().file_id(),
1059 selection,
1060 output,
1061 metrics,
1062 INDEX_TYPE_BLOOM,
1063 );
1064 pruned = true;
1065 }
1066 pruned
1067 }
1068
1069 #[cfg(feature = "vector_index")]
1071 async fn prune_row_groups_by_vector_index(
1072 &self,
1073 row_group_size: usize,
1074 num_row_groups: usize,
1075 output: &mut RowGroupSelection,
1076 metrics: &mut ReaderFilterMetrics,
1077 ) {
1078 let Some(applier) = &self.vector_index_applier else {
1079 return;
1080 };
1081 let Some(k) = self.vector_index_k else {
1082 return;
1083 };
1084 if !self.file_handle.meta_ref().vector_index_available() {
1085 return;
1086 }
1087
1088 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1089 let apply_res = applier
1090 .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1091 .await;
1092 let row_ids = match apply_res {
1093 Ok(res) => res.row_offsets,
1094 Err(err) => {
1095 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1096 return;
1097 }
1098 };
1099
1100 let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1101 {
1102 Ok(selection) => selection,
1103 Err(err) => {
1104 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1105 return;
1106 }
1107 };
1108 metrics.rows_vector_selected += selection.row_count();
1109 apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1110 }
1111
1112 async fn prune_row_groups_by_fulltext_bloom(
1113 &self,
1114 row_group_size: usize,
1115 parquet_meta: &ParquetMetaData,
1116 output: &mut RowGroupSelection,
1117 metrics: &mut ReaderFilterMetrics,
1118 skip_fields: bool,
1119 ) -> bool {
1120 if !self.file_handle.meta_ref().fulltext_index_available() {
1121 return false;
1122 }
1123
1124 let mut pruned = false;
1125 let appliers = if skip_fields {
1127 &self.fulltext_index_appliers[..1]
1128 } else {
1129 &self.fulltext_index_appliers[..]
1130 };
1131 for index_applier in appliers.iter().flatten() {
1132 let predicate_key = index_applier.predicate_key();
1133 let cached = self
1135 .cache_strategy
1136 .index_result_cache()
1137 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1138 if let Some(result) = cached.as_ref()
1139 && all_required_row_groups_searched(output, result)
1140 {
1141 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1142 metrics.fulltext_index_cache_hit += 1;
1143 pruned = true;
1144 continue;
1145 }
1146
1147 metrics.fulltext_index_cache_miss += 1;
1149 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1150 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1151 (
1152 rg.num_rows() as usize,
1153 output.contains_non_empty_row_group(i)
1155 && cached
1156 .as_ref()
1157 .map(|c| !c.contains_row_group(i))
1158 .unwrap_or(true),
1159 )
1160 });
1161 let apply_res = index_applier
1162 .apply_coarse(
1163 self.file_handle.index_id(),
1164 Some(file_size_hint),
1165 rgs,
1166 metrics.fulltext_index_apply_metrics.as_mut(),
1167 )
1168 .await;
1169 let mut selection = match apply_res {
1170 Ok(Some(apply_output)) => {
1171 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1172 }
1173 Ok(None) => continue,
1174 Err(err) => {
1175 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1176 continue;
1177 }
1178 };
1179
1180 if let Some(cached) = cached.as_ref() {
1182 selection.concat(cached);
1183 }
1184
1185 self.apply_index_result_and_update_cache(
1186 predicate_key,
1187 self.file_handle.file_id().file_id(),
1188 selection,
1189 output,
1190 metrics,
1191 INDEX_TYPE_FULLTEXT,
1192 );
1193 pruned = true;
1194 }
1195 pruned
1196 }
1197
1198 fn row_groups_by_minmax(
1200 &self,
1201 read_format: &FlatReadFormat,
1202 parquet_meta: &ParquetMetaData,
1203 row_group_size: usize,
1204 total_row_count: usize,
1205 metrics: &mut ReaderFilterMetrics,
1206 skip_fields: bool,
1207 ) -> RowGroupSelection {
1208 let Some(predicate) = &self.predicate else {
1209 return RowGroupSelection::new(row_group_size, total_row_count);
1210 };
1211
1212 let file_id = self.file_handle.file_id().file_id();
1213 let index_result_cache = self.cache_strategy.index_result_cache();
1214 let cached_minmax_key =
1215 if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1216 let mut exprs = predicate
1219 .exprs()
1220 .iter()
1221 .map(|expr| format!("{expr:?}"))
1222 .collect::<Vec<_>>();
1223 exprs.sort();
1224 let schema_version = self
1225 .expected_metadata
1226 .as_ref()
1227 .map(|meta| meta.schema_version)
1228 .unwrap_or_else(|| read_format.metadata().schema_version);
1229 Some(PredicateKey::new_minmax(
1230 Arc::new(exprs),
1231 schema_version,
1232 skip_fields,
1233 ))
1234 } else {
1235 None
1236 };
1237
1238 if let Some(index_result_cache) = index_result_cache
1239 && let Some(predicate_key) = cached_minmax_key.as_ref()
1240 {
1241 if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1242 metrics.minmax_cache_hit += 1;
1243 let num_row_groups = parquet_meta.num_row_groups();
1244 metrics.rg_minmax_filtered +=
1245 num_row_groups.saturating_sub(result.row_group_count());
1246 return (*result).clone();
1247 }
1248
1249 metrics.minmax_cache_miss += 1;
1250 }
1251
1252 let region_meta = read_format.metadata();
1253 let row_groups = parquet_meta.row_groups();
1254 let stats = RowGroupPruningStats::new(
1255 row_groups,
1256 read_format,
1257 self.expected_metadata.clone(),
1258 skip_fields,
1259 );
1260 let prune_schema = self
1261 .expected_metadata
1262 .as_ref()
1263 .map(|meta| meta.schema.arrow_schema())
1264 .unwrap_or_else(|| region_meta.schema.arrow_schema());
1265
1266 let mask = predicate.prune_with_stats(&stats, prune_schema);
1270 let output = RowGroupSelection::from_full_row_group_ids(
1271 mask.iter()
1272 .enumerate()
1273 .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1274 row_group_size,
1275 total_row_count,
1276 );
1277
1278 metrics.rg_minmax_filtered += parquet_meta
1279 .num_row_groups()
1280 .saturating_sub(output.row_group_count());
1281
1282 if let Some(index_result_cache) = index_result_cache
1283 && let Some(predicate_key) = cached_minmax_key
1284 {
1285 index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1286 }
1287
1288 output
1289 }
1290
1291 fn apply_index_result_and_update_cache(
1292 &self,
1293 predicate_key: &PredicateKey,
1294 file_id: FileId,
1295 result: RowGroupSelection,
1296 output: &mut RowGroupSelection,
1297 metrics: &mut ReaderFilterMetrics,
1298 index_type: &str,
1299 ) {
1300 apply_selection_and_update_metrics(output, &result, metrics, index_type);
1301
1302 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1303 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1304 }
1305 }
1306}
1307
1308fn has_row_level_selection(selection: &RowGroupSelection, parquet_meta: &ParquetMetaData) -> bool {
1309 selection.iter().any(|(row_group_idx, row_selection)| {
1310 let Some(row_group) = parquet_meta.row_groups().get(*row_group_idx) else {
1311 return false;
1312 };
1313
1314 row_selection.row_count() != row_group.num_rows() as usize
1315 || row_selection.iter().any(|selector| selector.skip)
1316 })
1317}
1318
1319fn apply_selection_and_update_metrics(
1320 output: &mut RowGroupSelection,
1321 result: &RowGroupSelection,
1322 metrics: &mut ReaderFilterMetrics,
1323 index_type: &str,
1324) {
1325 let intersection = output.intersect(result);
1326
1327 let row_group_count = output.row_group_count() - intersection.row_group_count();
1328 let row_count = output.row_count() - intersection.row_count();
1329
1330 metrics.update_index_metrics(index_type, row_group_count, row_count);
1331
1332 *output = intersection;
1333}
1334
1335#[cfg(feature = "vector_index")]
1336fn vector_selection_from_offsets(
1337 row_offsets: Vec<u64>,
1338 row_group_size: usize,
1339 num_row_groups: usize,
1340) -> Result<RowGroupSelection> {
1341 let mut row_ids = BTreeSet::new();
1342 for offset in row_offsets {
1343 let row_id = u32::try_from(offset).map_err(|_| {
1344 ApplyVectorIndexSnafu {
1345 reason: format!("Row offset {} exceeds u32::MAX", offset),
1346 }
1347 .build()
1348 })?;
1349 row_ids.insert(row_id);
1350 }
1351 Ok(RowGroupSelection::from_row_ids(
1352 row_ids,
1353 row_group_size,
1354 num_row_groups,
1355 ))
1356}
1357
1358fn all_required_row_groups_searched(
1359 required_row_groups: &RowGroupSelection,
1360 cached_row_groups: &RowGroupSelection,
1361) -> bool {
1362 required_row_groups.iter().all(|(rg_id, _)| {
1363 !required_row_groups.contains_non_empty_row_group(*rg_id)
1365 || cached_row_groups.contains_row_group(*rg_id)
1367 })
1368}
1369
1370#[derive(Debug, Default, Clone)]
1372pub(crate) struct ReaderFilterMetrics {
1373 pub(crate) rg_total: usize,
1375 pub(crate) rg_fulltext_filtered: usize,
1377 pub(crate) rg_inverted_filtered: usize,
1379 pub(crate) rg_minmax_filtered: usize,
1381 pub(crate) rg_bloom_filtered: usize,
1383 pub(crate) rg_vector_filtered: usize,
1385
1386 pub(crate) rows_total: usize,
1388 pub(crate) rows_fulltext_filtered: usize,
1390 pub(crate) rows_inverted_filtered: usize,
1392 pub(crate) rows_bloom_filtered: usize,
1394 pub(crate) rows_vector_filtered: usize,
1396 pub(crate) rows_vector_selected: usize,
1398 pub(crate) rows_precise_filtered: usize,
1400
1401 pub(crate) fulltext_index_cache_hit: usize,
1403 pub(crate) fulltext_index_cache_miss: usize,
1405 pub(crate) inverted_index_cache_hit: usize,
1407 pub(crate) inverted_index_cache_miss: usize,
1409 pub(crate) bloom_filter_cache_hit: usize,
1411 pub(crate) bloom_filter_cache_miss: usize,
1413 pub(crate) minmax_cache_hit: usize,
1415 pub(crate) minmax_cache_miss: usize,
1417
1418 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1420 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1422 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1424
1425 pub(crate) pruner_cache_hit: usize,
1427 pub(crate) pruner_cache_miss: usize,
1429 pub(crate) pruner_prune_cost: Duration,
1431 pub(crate) files_time_range_pruned: usize,
1433}
1434
1435impl ReaderFilterMetrics {
1436 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1438 self.rg_total += other.rg_total;
1439 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1440 self.rg_inverted_filtered += other.rg_inverted_filtered;
1441 self.rg_minmax_filtered += other.rg_minmax_filtered;
1442 self.rg_bloom_filtered += other.rg_bloom_filtered;
1443 self.rg_vector_filtered += other.rg_vector_filtered;
1444
1445 self.rows_total += other.rows_total;
1446 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1447 self.rows_inverted_filtered += other.rows_inverted_filtered;
1448 self.rows_bloom_filtered += other.rows_bloom_filtered;
1449 self.rows_vector_filtered += other.rows_vector_filtered;
1450 self.rows_vector_selected += other.rows_vector_selected;
1451 self.rows_precise_filtered += other.rows_precise_filtered;
1452
1453 self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1454 self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1455 self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1456 self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1457 self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1458 self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1459 self.minmax_cache_hit += other.minmax_cache_hit;
1460 self.minmax_cache_miss += other.minmax_cache_miss;
1461
1462 self.pruner_cache_hit += other.pruner_cache_hit;
1463 self.pruner_cache_miss += other.pruner_cache_miss;
1464 self.pruner_prune_cost += other.pruner_prune_cost;
1465 self.files_time_range_pruned += other.files_time_range_pruned;
1466
1467 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1469 self.inverted_index_apply_metrics
1470 .get_or_insert_with(Default::default)
1471 .merge_from(other_metrics);
1472 }
1473 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1474 self.bloom_filter_apply_metrics
1475 .get_or_insert_with(Default::default)
1476 .merge_from(other_metrics);
1477 }
1478 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1479 self.fulltext_index_apply_metrics
1480 .get_or_insert_with(Default::default)
1481 .merge_from(other_metrics);
1482 }
1483 }
1484
1485 pub(crate) fn observe(&self) {
1487 READ_ROW_GROUPS_TOTAL
1488 .with_label_values(&["before_filtering"])
1489 .inc_by(self.rg_total as u64);
1490 READ_ROW_GROUPS_TOTAL
1491 .with_label_values(&["fulltext_index_filtered"])
1492 .inc_by(self.rg_fulltext_filtered as u64);
1493 READ_ROW_GROUPS_TOTAL
1494 .with_label_values(&["inverted_index_filtered"])
1495 .inc_by(self.rg_inverted_filtered as u64);
1496 READ_ROW_GROUPS_TOTAL
1497 .with_label_values(&["minmax_index_filtered"])
1498 .inc_by(self.rg_minmax_filtered as u64);
1499 READ_ROW_GROUPS_TOTAL
1500 .with_label_values(&["bloom_filter_index_filtered"])
1501 .inc_by(self.rg_bloom_filtered as u64);
1502 READ_ROW_GROUPS_TOTAL
1503 .with_label_values(&["vector_index_filtered"])
1504 .inc_by(self.rg_vector_filtered as u64);
1505
1506 PRECISE_FILTER_ROWS_TOTAL
1507 .with_label_values(&["parquet"])
1508 .inc_by(self.rows_precise_filtered as u64);
1509 READ_ROWS_IN_ROW_GROUP_TOTAL
1510 .with_label_values(&["before_filtering"])
1511 .inc_by(self.rows_total as u64);
1512 READ_ROWS_IN_ROW_GROUP_TOTAL
1513 .with_label_values(&["fulltext_index_filtered"])
1514 .inc_by(self.rows_fulltext_filtered as u64);
1515 READ_ROWS_IN_ROW_GROUP_TOTAL
1516 .with_label_values(&["inverted_index_filtered"])
1517 .inc_by(self.rows_inverted_filtered as u64);
1518 READ_ROWS_IN_ROW_GROUP_TOTAL
1519 .with_label_values(&["bloom_filter_index_filtered"])
1520 .inc_by(self.rows_bloom_filtered as u64);
1521 READ_ROWS_IN_ROW_GROUP_TOTAL
1522 .with_label_values(&["vector_index_filtered"])
1523 .inc_by(self.rows_vector_filtered as u64);
1524 }
1525
1526 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1527 match index_type {
1528 INDEX_TYPE_FULLTEXT => {
1529 self.rg_fulltext_filtered += row_group_count;
1530 self.rows_fulltext_filtered += row_count;
1531 }
1532 INDEX_TYPE_INVERTED => {
1533 self.rg_inverted_filtered += row_group_count;
1534 self.rows_inverted_filtered += row_count;
1535 }
1536 INDEX_TYPE_BLOOM => {
1537 self.rg_bloom_filtered += row_group_count;
1538 self.rows_bloom_filtered += row_count;
1539 }
1540 INDEX_TYPE_VECTOR => {
1541 self.rg_vector_filtered += row_group_count;
1542 self.rows_vector_filtered += row_count;
1543 }
1544 _ => {}
1545 }
1546 }
1547}
1548
1549#[cfg(all(test, feature = "vector_index"))]
1550mod vector_index_tests {
1551 use super::*;
1552
1553 #[test]
1554 fn test_vector_selection_from_offsets() {
1555 let row_group_size = 4;
1556 let num_row_groups = 3;
1557 let selection =
1558 vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1559 .unwrap();
1560
1561 assert_eq!(selection.row_group_count(), 3);
1562 assert_eq!(selection.row_count(), 4);
1563 assert!(selection.contains_non_empty_row_group(0));
1564 assert!(selection.contains_non_empty_row_group(1));
1565 assert!(selection.contains_non_empty_row_group(2));
1566 }
1567
1568 #[test]
1569 fn test_vector_selection_from_offsets_out_of_range() {
1570 let row_group_size = 4;
1571 let num_row_groups = 2;
1572 let selection = vector_selection_from_offsets(
1573 vec![0, 7, u64::from(u32::MAX) + 1],
1574 row_group_size,
1575 num_row_groups,
1576 );
1577 assert!(selection.is_err());
1578 }
1579
1580 #[test]
1581 fn test_vector_selection_updates_metrics() {
1582 let row_group_size = 4;
1583 let total_rows = 8;
1584 let mut output = RowGroupSelection::new(row_group_size, total_rows);
1585 let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1586 let mut metrics = ReaderFilterMetrics::default();
1587
1588 apply_selection_and_update_metrics(
1589 &mut output,
1590 &selection,
1591 &mut metrics,
1592 INDEX_TYPE_VECTOR,
1593 );
1594
1595 assert_eq!(metrics.rg_vector_filtered, 1);
1596 assert_eq!(metrics.rows_vector_filtered, 7);
1597 assert_eq!(output.row_count(), 1);
1598 }
1599}
1600
1601#[derive(Default, Clone, Copy)]
1603pub struct MetadataCacheMetrics {
1604 pub mem_cache_hit: usize,
1606 pub file_cache_hit: usize,
1608 pub cache_miss: usize,
1610 pub metadata_load_cost: Duration,
1612 pub num_reads: usize,
1614 pub bytes_read: u64,
1616}
1617
1618impl std::fmt::Debug for MetadataCacheMetrics {
1619 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1620 let Self {
1621 mem_cache_hit,
1622 file_cache_hit,
1623 cache_miss,
1624 metadata_load_cost,
1625 num_reads,
1626 bytes_read,
1627 } = self;
1628
1629 if self.is_empty() {
1630 return write!(f, "{{}}");
1631 }
1632 write!(f, "{{")?;
1633
1634 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1635
1636 if *mem_cache_hit > 0 {
1637 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1638 }
1639 if *file_cache_hit > 0 {
1640 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1641 }
1642 if *cache_miss > 0 {
1643 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1644 }
1645 if *num_reads > 0 {
1646 write!(f, ", \"num_reads\":{}", num_reads)?;
1647 }
1648 if *bytes_read > 0 {
1649 write!(f, ", \"bytes_read\":{}", bytes_read)?;
1650 }
1651
1652 write!(f, "}}")
1653 }
1654}
1655
1656impl MetadataCacheMetrics {
1657 pub(crate) fn is_empty(&self) -> bool {
1659 self.metadata_load_cost.is_zero()
1660 }
1661
1662 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1664 self.mem_cache_hit += other.mem_cache_hit;
1665 self.file_cache_hit += other.file_cache_hit;
1666 self.cache_miss += other.cache_miss;
1667 self.metadata_load_cost += other.metadata_load_cost;
1668 self.num_reads += other.num_reads;
1669 self.bytes_read += other.bytes_read;
1670 }
1671}
1672
1673#[derive(Debug, Default, Clone)]
1675pub struct ReaderMetrics {
1676 pub(crate) filter_metrics: ReaderFilterMetrics,
1678 pub(crate) build_cost: Duration,
1680 pub(crate) scan_cost: Duration,
1682 pub(crate) num_record_batches: usize,
1684 pub(crate) num_batches: usize,
1686 pub(crate) num_rows: usize,
1688 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1690 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1692 pub(crate) metadata_mem_size: isize,
1694 pub(crate) num_range_builders: isize,
1696}
1697
1698impl ReaderMetrics {
1699 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1701 self.filter_metrics.merge_from(&other.filter_metrics);
1702 self.build_cost += other.build_cost;
1703 self.scan_cost += other.scan_cost;
1704 self.num_record_batches += other.num_record_batches;
1705 self.num_batches += other.num_batches;
1706 self.num_rows += other.num_rows;
1707 self.metadata_cache_metrics
1708 .merge_from(&other.metadata_cache_metrics);
1709 if let Some(other_fetch) = &other.fetch_metrics {
1710 if let Some(self_fetch) = &self.fetch_metrics {
1711 self_fetch.merge_from(other_fetch);
1712 } else {
1713 self.fetch_metrics = Some(other_fetch.clone());
1714 }
1715 }
1716 self.metadata_mem_size += other.metadata_mem_size;
1717 self.num_range_builders += other.num_range_builders;
1718 }
1719
1720 pub(crate) fn observe_rows(&self, read_type: &str) {
1722 READ_ROWS_TOTAL
1723 .with_label_values(&[read_type])
1724 .inc_by(self.num_rows as u64);
1725 }
1726}
1727
1728pub(crate) struct RowGroupReaderBuilder {
1730 file_handle: FileHandle,
1734 file_path: String,
1736 parquet_meta: Arc<ParquetMetaData>,
1738 arrow_metadata: ArrowReaderMetadata,
1740 output_schema: SchemaRef,
1742 object_store: ObjectStore,
1744 projection: ProjectionMaskPlan,
1746 has_nested_projection: bool,
1748 cache_strategy: CacheStrategy,
1750 prefilter_builder: Option<PrefilterContextBuilder>,
1752}
1753
1754pub(crate) struct RowGroupBuildContext<'a> {
1757 pub(crate) row_group_idx: usize,
1759 pub(crate) row_selection: Option<RowSelection>,
1761 pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1763}
1764
1765impl RowGroupReaderBuilder {
1766 pub(crate) fn file_path(&self) -> &str {
1768 &self.file_path
1769 }
1770
1771 pub(crate) fn file_handle(&self) -> &FileHandle {
1773 &self.file_handle
1774 }
1775
1776 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1777 &self.parquet_meta
1778 }
1779
1780 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1781 &self.cache_strategy
1782 }
1783
1784 pub(crate) fn has_predicate_prefilter(&self) -> bool {
1785 self.prefilter_builder.is_some()
1786 }
1787
1788 pub(crate) async fn build(
1807 &self,
1808 build_ctx: RowGroupBuildContext<'_>,
1809 ) -> Result<ProjectedRecordBatchStream> {
1810 let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1811
1812 let Some(mut prefilter_ctx) = prefilter_ctx else {
1813 let stream = self
1815 .build_with_projection(
1816 build_ctx.row_group_idx,
1817 build_ctx.row_selection,
1818 self.projection.mask.clone(),
1819 build_ctx.fetch_metrics,
1820 )
1821 .await?;
1822 return self.make_projected_stream(stream);
1823 };
1824
1825 let prefilter_start = Instant::now();
1826 let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1827 if let Some(metrics) = build_ctx.fetch_metrics {
1828 let mut data = metrics.data.lock().unwrap();
1829 data.prefilter_cost += prefilter_start.elapsed();
1830 data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1831 }
1832
1833 let refined_selection = Some(prefilter_result.refined_selection);
1834
1835 let stream = self
1836 .build_with_projection(
1837 build_ctx.row_group_idx,
1838 refined_selection,
1839 self.projection.mask.clone(),
1840 build_ctx.fetch_metrics,
1841 )
1842 .await?;
1843 self.make_projected_stream(stream)
1844 }
1845
1846 fn make_projected_stream(
1847 &self,
1848 stream: ProjectedRecordBatchStream,
1849 ) -> Result<ProjectedRecordBatchStream> {
1850 if !self.has_nested_projection {
1851 return Ok(stream);
1852 }
1853
1854 Ok(NestedSchemaAligner::new(
1855 stream,
1856 self.projection.projected_root_presence.clone(),
1857 self.output_schema.clone(),
1858 )?
1859 .boxed())
1860 }
1861
1862 pub(crate) async fn build_with_projection(
1864 &self,
1865 row_group_idx: usize,
1866 row_selection: Option<RowSelection>,
1867 projection: ProjectionMask,
1868 fetch_metrics: Option<&ParquetFetchMetrics>,
1869 ) -> Result<ProjectedRecordBatchStream> {
1870 let range_fetcher = SstParquetRangeFetcher::new(
1871 self.file_handle.file_id(),
1872 self.file_path.clone(),
1873 self.object_store.clone(),
1874 self.cache_strategy.clone(),
1875 row_group_idx,
1876 fetch_metrics.cloned(),
1877 );
1878
1879 build_sst_parquet_record_batch_stream(
1880 self.arrow_metadata.clone(),
1881 row_group_idx,
1882 row_selection,
1883 projection,
1884 range_fetcher,
1885 self.file_path.clone(),
1886 DEFAULT_READ_BATCH_SIZE,
1887 )
1888 }
1889}
1890
1891#[derive(Clone)]
1892pub(crate) enum MaybeFilter {
1894 Filter(SimpleFilterEvaluator),
1896 Matched,
1898 Pruned,
1900}
1901
1902impl MaybeFilter {
1903 pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
1905 match self {
1906 MaybeFilter::Filter(filter) => Some(filter),
1907 MaybeFilter::Matched | MaybeFilter::Pruned => None,
1908 }
1909 }
1910}
1911
1912#[derive(Clone)]
1913pub(crate) struct SimpleFilterContext {
1915 filter: MaybeFilter,
1917 expr_str: String,
1919 column_id: ColumnId,
1921 semantic_type: SemanticType,
1923}
1924
1925impl SimpleFilterContext {
1926 pub(crate) fn new_opt(
1931 sst_meta: &RegionMetadataRef,
1932 expected_meta: Option<&RegionMetadata>,
1933 expr: &Expr,
1934 ) -> Option<Self> {
1935 let filter = SimpleFilterEvaluator::try_new(expr)?;
1936 let expr_str = format!("{expr:?}");
1937 let (column_metadata, maybe_filter) = match expected_meta {
1938 Some(meta) => {
1939 let column = meta.column_by_name(filter.column_name())?;
1941 match sst_meta.column_by_id(column.column_id) {
1944 Some(sst_column) => {
1945 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1946 let maybe_filter = if sst_column.column_schema.data_type
1951 == column.column_schema.data_type
1952 {
1953 MaybeFilter::Filter(filter)
1954 } else {
1955 debug_assert_eq!(column.semantic_type, SemanticType::Field);
1958 return None;
1959 };
1960 (column, maybe_filter)
1961 }
1962 None => {
1963 if pruned_by_default(&filter, column)? {
1967 (column, MaybeFilter::Pruned)
1968 } else {
1969 (column, MaybeFilter::Matched)
1970 }
1971 }
1972 }
1973 }
1974 None => {
1975 let column = sst_meta.column_by_name(filter.column_name())?;
1976 (column, MaybeFilter::Filter(filter))
1977 }
1978 };
1979
1980 Some(Self {
1981 filter: maybe_filter,
1982 expr_str,
1983 column_id: column_metadata.column_id,
1984 semantic_type: column_metadata.semantic_type,
1985 })
1986 }
1987
1988 pub(crate) fn filter(&self) -> &MaybeFilter {
1990 &self.filter
1991 }
1992
1993 pub(crate) fn expr_str(&self) -> &str {
1995 &self.expr_str
1996 }
1997
1998 pub(crate) fn column_id(&self) -> ColumnId {
2000 self.column_id
2001 }
2002
2003 pub(crate) fn semantic_type(&self) -> SemanticType {
2005 self.semantic_type
2006 }
2007}
2008
2009#[derive(Clone)]
2011pub(crate) struct PhysicalFilterContext {
2012 filter: Arc<dyn PhysicalExpr>,
2014 expr_str: String,
2016 column_id: ColumnId,
2018 column_name: String,
2020 semantic_type: SemanticType,
2022 schema: SchemaRef,
2024 immutable: bool,
2026}
2027
2028impl PhysicalFilterContext {
2029 pub(crate) fn new_opt(
2034 sst_meta: &RegionMetadataRef,
2035 expected_meta: Option<&RegionMetadata>,
2036 read_format: &FlatReadFormat,
2037 expr: &Expr,
2038 ) -> Option<Self> {
2039 if !Self::is_prefilter_candidate(expr) {
2040 return None;
2041 }
2042 let expr_str = format!("{expr:?}");
2043 let column_name = Self::single_column_name(expr)?;
2044 let column_metadata = match expected_meta {
2045 Some(meta) => {
2046 let column = meta.column_by_name(&column_name)?;
2047 let sst_column = sst_meta.column_by_id(column.column_id)?;
2048 if sst_column.column_schema.name != column_name {
2050 return None;
2051 }
2052 column
2053 }
2054 None => sst_meta.column_by_name(&column_name)?,
2055 };
2056
2057 let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
2060 let field = field.clone();
2061 let schema = Arc::new(ArrowSchema::new(vec![field]));
2062 let physical_expr = Predicate::to_physical_expr(expr, &schema)
2063 .inspect_err(|e| {
2064 error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
2065 })
2066 .ok()?;
2067 let immutable = expr_is_immutable(expr);
2068
2069 Some(Self {
2070 filter: physical_expr,
2071 expr_str,
2072 column_id: column_metadata.column_id,
2073 column_name,
2074 semantic_type: column_metadata.semantic_type,
2075 schema,
2076 immutable,
2077 })
2078 }
2079
2080 fn is_prefilter_candidate(expr: &Expr) -> bool {
2085 if !matches!(
2086 expr,
2087 Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
2088 ) {
2089 return false;
2090 }
2091
2092 !expr
2095 .exists(|e| Ok(matches!(e, Expr::ScalarFunction(_))))
2096 .unwrap_or(false)
2097 }
2098
2099 fn single_column_name(expr: &Expr) -> Option<String> {
2100 let mut columns = HashSet::new();
2101 if expr_to_columns(expr, &mut columns).is_err() {
2102 return None;
2103 }
2104 if columns.len() != 1 {
2105 return None;
2106 }
2107 columns.iter().next().map(|column| column.name.clone())
2108 }
2109
2110 pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
2112 &self.filter
2113 }
2114
2115 pub(crate) fn expr_str(&self) -> &str {
2117 &self.expr_str
2118 }
2119
2120 pub(crate) fn column_id(&self) -> ColumnId {
2122 self.column_id
2123 }
2124
2125 pub(crate) fn column_name(&self) -> &str {
2127 &self.column_name
2128 }
2129
2130 pub(crate) fn semantic_type(&self) -> SemanticType {
2132 self.semantic_type
2133 }
2134
2135 pub(crate) fn schema(&self) -> &SchemaRef {
2137 &self.schema
2138 }
2139
2140 pub(crate) fn is_immutable(&self) -> bool {
2142 self.immutable
2143 }
2144}
2145
2146fn expr_is_immutable(expr: &Expr) -> bool {
2147 let mut is_immutable = true;
2148 let _ = expr.apply(|expr| match expr {
2149 Expr::ScalarFunction(function)
2150 if function.func.signature().volatility != Volatility::Immutable =>
2151 {
2152 is_immutable = false;
2153 Ok(TreeNodeRecursion::Stop)
2154 }
2155 Expr::ScalarVariable(_, _) => {
2156 is_immutable = false;
2157 Ok(TreeNodeRecursion::Stop)
2158 }
2159 _ => Ok(TreeNodeRecursion::Continue),
2160 });
2161 is_immutable
2162}
2163
2164fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
2167 let value = column.column_schema.create_default().ok().flatten()?;
2168 let scalar_value = value
2169 .try_to_scalar_value(&column.column_schema.data_type)
2170 .ok()?;
2171 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
2172 Some(!matches)
2173}
2174
2175pub struct ParquetReader {
2177 context: FileRangeContextRef,
2179 selection: RowGroupSelection,
2181 reader: Option<FlatPruneReader>,
2183 fetch_metrics: ParquetFetchMetrics,
2185}
2186
2187impl ParquetReader {
2188 #[tracing::instrument(
2189 skip_all,
2190 fields(
2191 region_id = %self.context.reader_builder().file_handle.region_id(),
2192 file_id = %self.context.reader_builder().file_handle.file_id()
2193 )
2194 )]
2195 pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2196 loop {
2197 if let Some(reader) = &mut self.reader {
2198 if let Some(batch) = reader.next_batch().await? {
2199 return Ok(Some(batch));
2200 }
2201 self.reader = None;
2202 continue;
2203 }
2204
2205 let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
2206 return Ok(None);
2207 };
2208
2209 let skip_fields = self.context.pre_filter_mode().skip_fields();
2210 let parquet_reader = self
2211 .context
2212 .reader_builder()
2213 .build(self.context.build_context(
2214 row_group_idx,
2215 Some(row_selection),
2216 Some(&self.fetch_metrics),
2217 ))
2218 .await?;
2219 self.reader = Some(FlatPruneReader::new_with_row_group_reader(
2220 self.context.clone(),
2221 FlatRowGroupReader::new(self.context.clone(), parquet_reader),
2222 skip_fields,
2223 ));
2224 }
2225 }
2226 #[tracing::instrument(
2228 skip_all,
2229 fields(
2230 region_id = %context.reader_builder().file_handle.region_id(),
2231 file_id = %context.reader_builder().file_handle.file_id()
2232 )
2233 )]
2234 pub(crate) async fn new(
2235 context: FileRangeContextRef,
2236 mut selection: RowGroupSelection,
2237 ) -> Result<Self> {
2238 let fetch_metrics = ParquetFetchMetrics::default();
2239 let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
2240 let skip_fields = context.pre_filter_mode().skip_fields();
2241 let parquet_reader = context
2242 .reader_builder()
2243 .build(context.build_context(
2244 row_group_idx,
2245 Some(row_selection),
2246 Some(&fetch_metrics),
2247 ))
2248 .await?;
2249 Some(FlatPruneReader::new_with_row_group_reader(
2250 context.clone(),
2251 FlatRowGroupReader::new(context.clone(), parquet_reader),
2252 skip_fields,
2253 ))
2254 } else {
2255 None
2256 };
2257
2258 Ok(ParquetReader {
2259 context,
2260 selection,
2261 reader,
2262 fetch_metrics,
2263 })
2264 }
2265
2266 pub fn metadata(&self) -> &RegionMetadataRef {
2268 self.context.read_format().metadata()
2269 }
2270
2271 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2272 self.context.reader_builder().parquet_meta.clone()
2273 }
2274}
2275
2276pub(crate) struct FlatRowGroupReader {
2278 context: FileRangeContextRef,
2280 stream: ProjectedRecordBatchStream,
2282 override_sequence: Option<ArrayRef>,
2284}
2285
2286impl FlatRowGroupReader {
2287 pub(crate) fn new(context: FileRangeContextRef, stream: ProjectedRecordBatchStream) -> Self {
2289 let override_sequence = context
2291 .read_format()
2292 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2293
2294 Self {
2295 context,
2296 stream,
2297 override_sequence,
2298 }
2299 }
2300
2301 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2303 match self.stream.next().await {
2304 Some(batch_result) => {
2305 let record_batch = batch_result?;
2306
2307 let record_batch = self
2308 .context
2309 .read_format()
2310 .convert_batch(record_batch, self.override_sequence.as_ref())?;
2311 Ok(Some(record_batch))
2312 }
2313 None => Ok(None),
2314 }
2315 }
2316}
2317
2318#[cfg(test)]
2319mod tests {
2320 use std::any::Any;
2321 use std::fmt::{Debug, Formatter};
2322 use std::sync::{Arc, LazyLock};
2323
2324 use common_error::ext::WhateverResult;
2325 use common_function::scalars::json::json_get::JsonGetWithType;
2326 use common_function::scalars::udf::create_udf;
2327 use common_recordbatch::ext::RecordBatchExt;
2328 use datafusion::arrow::datatypes::DataType;
2329 use datafusion_common::ScalarValue;
2330 use datafusion_expr::expr::ScalarFunction;
2331 use datafusion_expr::{
2332 ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2333 col, lit,
2334 };
2335 use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, StructArray};
2336 use datatypes::arrow::datatypes::{Fields, Schema};
2337 use datatypes::arrow::record_batch::RecordBatch;
2338 use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
2339 use datatypes::prelude::ConcreteDataType;
2340 use datatypes::schema::ColumnSchema;
2341 use object_store::services::Memory;
2342 use parquet::arrow::ArrowWriter;
2343 use parquet::file::properties::WriterProperties;
2344 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2345 use store_api::region_request::PathType;
2346 use store_api::storage::RegionId;
2347 use table::predicate::Predicate;
2348
2349 use super::*;
2350 use crate::sst::parquet::metadata::MetadataLoader;
2351 use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
2352 use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2353
2354 #[test]
2355 fn test_skip_prefilter_for_json_get() -> WhateverResult<()> {
2356 fn json_get_expr(base: Expr, path: &str) -> Expr {
2357 let json_get = Arc::new(create_udf(Arc::new(JsonGetWithType::default())));
2358 Expr::ScalarFunction(ScalarFunction::new_udf(json_get, vec![base, lit(path)]))
2359 }
2360
2361 let metadata = Arc::new(sst_region_metadata());
2362 let format = FlatReadFormat::new(
2363 metadata.clone(),
2364 ReadColumns::from_deduped_column_ids(
2365 metadata.column_metadatas.iter().map(|c| c.column_id),
2366 ),
2367 None,
2368 "test",
2369 true,
2370 )?;
2371 let new_filter =
2372 |expr: Expr| PhysicalFilterContext::new_opt(&metadata, None, &format, &expr);
2373
2374 let json_get = || json_get_expr(col("field_0"), "a.b");
2375
2376 let regular_expr = col("field_0").is_null();
2377 assert!(new_filter(regular_expr).is_some());
2378
2379 let is_null = json_get().is_null();
2380 assert!(new_filter(is_null).is_none());
2381
2382 let is_not_null = json_get().is_not_null();
2383 assert!(new_filter(is_not_null).is_none());
2384
2385 let in_list = json_get().in_list(vec![lit("value")], false);
2386 assert!(new_filter(in_list).is_none());
2387
2388 let in_list_nested = col("field_0").in_list(vec![json_get()], false);
2389 assert!(new_filter(in_list_nested).is_none());
2390
2391 let between = json_get().between(lit(1_u64), lit(10_u64));
2392 assert!(new_filter(between).is_none());
2393
2394 let between_nested = col("field_0").between(json_get(), lit(10_u64));
2395 assert!(new_filter(between_nested).is_none());
2396
2397 Ok(())
2398 }
2399
2400 #[tokio::test]
2401 async fn test_nested_projection_reads_partial_json2_physical_fields() -> WhateverResult<()> {
2402 let xy_fields = Fields::from(vec![
2408 Arc::new(Field::new("x", DataType::Int64, true)),
2409 Arc::new(Field::new("y", DataType::Utf8, true)),
2410 ]);
2411 let a_field = Arc::new(Field::new("a", DataType::Struct(xy_fields.clone()), true));
2412 let b_field = Arc::new(Field::new("b", DataType::Utf8, true));
2413 let json_fields = Fields::from(vec![a_field, b_field]);
2414 let json_field = Field::new("j", DataType::Struct(json_fields.clone()), true)
2415 .with_extension_type(JsonExtensionType::new(Arc::new(JsonMetadata::default())));
2416 let schema = Arc::new(Schema::new(vec![json_field]));
2417
2418 let a_array = Arc::new(StructArray::new(
2419 xy_fields,
2420 vec![
2421 Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef,
2422 Arc::new(StringArray::from_iter_values(["x1", "x2", "x3"])) as ArrayRef,
2423 ],
2424 None,
2425 )) as ArrayRef;
2426 let b_array = Arc::new(StringArray::from_iter_values(["b1", "b2", "b3"])) as ArrayRef;
2427 let j_array =
2428 Arc::new(StructArray::new(json_fields, vec![a_array, b_array], None)) as ArrayRef;
2429 let columns = vec![j_array];
2430
2431 let batch = RecordBatch::try_new(schema, columns).map_err(|e| e.to_string())?;
2432
2433 let object_store = ObjectStore::new(Memory::default())
2437 .map_err(|e| e.to_string())?
2438 .finish();
2439 let file_handle = sst_file_handle(0, 1);
2440 let file_path = file_handle.file_path("test_table", PathType::Bare);
2441
2442 let mut parquet_bytes = Vec::new();
2443 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None)
2444 .and_then(|mut w| {
2445 w.write(&batch)?;
2446 Ok(w)
2447 })
2448 .and_then(|w| w.close())
2449 .map_err(|e| e.to_string())?;
2450 let file_size = parquet_bytes.len() as u64;
2451 object_store
2452 .write(&file_path, parquet_bytes)
2453 .await
2454 .map_err(|e| e.to_string())?;
2455
2456 let mut cache_metrics = MetadataCacheMetrics::default();
2457 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2458 let parquet_meta = loader.load(&mut cache_metrics).await?;
2459 let parquet_schema = parquet_meta.file_metadata().schema_descr();
2460 assert_eq!(3, parquet_schema.num_columns());
2461
2462 let projection =
2466 ParquetReadColumns::from_deduped(vec![ParquetReadColumn::new(0).with_nested_paths(
2467 vec![vec!["j".to_string(), "a".to_string(), "x".to_string()]],
2468 )]);
2469 let projection_plan = build_projection_plan(&projection, parquet_schema);
2470 assert_eq!(vec![true], projection_plan.projected_root_presence);
2471 assert_eq!(
2472 projection_plan.mask,
2473 ProjectionMask::leaves(parquet_schema, vec![0])
2474 );
2475
2476 let arrow_metadata =
2479 ArrowReaderMetadata::try_new(Arc::new(parquet_meta), ArrowReaderOptions::new())
2480 .map_err(|e| e.to_string())?;
2481 let fetcher = SstParquetRangeFetcher::new(
2482 file_handle.file_id(),
2483 file_path.clone(),
2484 object_store,
2485 CacheStrategy::Disabled,
2486 0,
2487 None,
2488 );
2489 let mut stream = build_sst_parquet_record_batch_stream(
2490 arrow_metadata,
2491 0,
2492 None,
2493 projection_plan.mask,
2494 fetcher,
2495 file_path,
2496 1024,
2497 )?;
2498
2499 let Some(batch) = stream.next().await.transpose()? else {
2500 unreachable!()
2501 };
2502 let expected = r#"
2503+-------------+
2504| j |
2505+-------------+
2506| {a: {x: 1}} |
2507| {a: {x: 2}} |
2508| {a: {x: 3}} |
2509+-------------+
2510"#;
2511 assert_eq!(batch.pretty_print(), expected.trim());
2512 Ok(())
2513 }
2514
2515 #[tokio::test(flavor = "current_thread")]
2516 async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2517 #[derive(Eq, PartialEq, Hash)]
2518 struct PanicDebugUdf;
2519
2520 impl Debug for PanicDebugUdf {
2521 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2522 panic!("minmax predicate key should not format exprs when cache is disabled");
2523 }
2524 }
2525
2526 impl ScalarUDFImpl for PanicDebugUdf {
2527 fn as_any(&self) -> &dyn Any {
2528 self
2529 }
2530
2531 fn name(&self) -> &str {
2532 "panic_debug_udf"
2533 }
2534
2535 fn signature(&self) -> &Signature {
2536 static SIGNATURE: LazyLock<Signature> =
2537 LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2538 &SIGNATURE
2539 }
2540
2541 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2542 Ok(DataType::Int64)
2543 }
2544
2545 fn invoke_with_args(
2546 &self,
2547 _args: ScalarFunctionArgs,
2548 ) -> datafusion_common::Result<ColumnarValue> {
2549 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2550 }
2551 }
2552
2553 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2554 let file_handle = sst_file_handle(0, 1);
2555 let table_dir = "test_table".to_string();
2556 let path_type = PathType::Bare;
2557 let file_path = file_handle.file_path(&table_dir, path_type);
2558
2559 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2560 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2561 let mut parquet_bytes = Vec::new();
2562 let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2563 writer.write(&batch).unwrap();
2564 writer.close().unwrap();
2565 let file_size = parquet_bytes.len() as u64;
2566 object_store.write(&file_path, parquet_bytes).await.unwrap();
2567
2568 let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2569 let read_format = FlatReadFormat::new(
2570 region_metadata.clone(),
2571 ReadColumns::from_deduped_column_ids(
2572 region_metadata
2573 .column_metadatas
2574 .iter()
2575 .map(|column| column.column_id),
2576 ),
2577 None,
2578 &file_path,
2579 false,
2580 )
2581 .unwrap();
2582
2583 let mut cache_metrics = MetadataCacheMetrics::default();
2584 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2585 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2586
2587 let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2588 let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2589 udf,
2590 vec![],
2591 ))]);
2592 let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2593 .predicate(Some(predicate))
2594 .cache(CacheStrategy::Disabled);
2595
2596 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2597 let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2598 let mut metrics = ReaderFilterMetrics::default();
2599 let selection = builder.row_groups_by_minmax(
2600 &read_format,
2601 &parquet_meta,
2602 row_group_size,
2603 total_row_count,
2604 &mut metrics,
2605 false,
2606 );
2607
2608 assert!(!selection.is_empty());
2609 }
2610
2611 #[test]
2612 fn test_expr_is_immutable_checks_scalar_function_volatility() {
2613 #[derive(Debug, PartialEq, Eq, Hash)]
2614 struct TestVolatilityUdf {
2615 name: String,
2616 signature: Signature,
2617 }
2618
2619 impl TestVolatilityUdf {
2620 fn new(name: &str, volatility: Volatility) -> Self {
2621 Self {
2622 name: name.to_string(),
2623 signature: Signature::variadic_any(volatility),
2624 }
2625 }
2626 }
2627
2628 impl ScalarUDFImpl for TestVolatilityUdf {
2629 fn as_any(&self) -> &dyn Any {
2630 self
2631 }
2632
2633 fn name(&self) -> &str {
2634 &self.name
2635 }
2636
2637 fn signature(&self) -> &Signature {
2638 &self.signature
2639 }
2640
2641 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2642 Ok(DataType::Int64)
2643 }
2644
2645 fn invoke_with_args(
2646 &self,
2647 _args: ScalarFunctionArgs,
2648 ) -> datafusion_common::Result<ColumnarValue> {
2649 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2650 }
2651 }
2652
2653 let expr = |name: &str, volatility| {
2654 Expr::ScalarFunction(ScalarFunction::new_udf(
2655 Arc::new(ScalarUDF::new_from_impl(TestVolatilityUdf::new(
2656 name, volatility,
2657 ))),
2658 vec![],
2659 ))
2660 };
2661
2662 assert!(expr_is_immutable(&expr(
2663 "immutable_udf",
2664 Volatility::Immutable
2665 )));
2666 assert!(!expr_is_immutable(&expr("stable_udf", Volatility::Stable)));
2667 assert!(!expr_is_immutable(&expr(
2668 "volatile_udf",
2669 Volatility::Volatile
2670 )));
2671
2672 let scalar_variable = Expr::ScalarVariable(
2673 Arc::new(Field::new("@@version", DataType::Utf8, false)),
2674 vec!["@@version".to_string()],
2675 );
2676 assert!(!expr_is_immutable(&scalar_variable));
2677 }
2678
2679 #[tokio::test(flavor = "current_thread")]
2680 async fn test_has_row_level_selection() {
2681 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2682 let file_path = "row_level_selection.parquet";
2683
2684 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef;
2685 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2686 let props = WriterProperties::builder()
2687 .set_max_row_group_row_count(Some(3))
2688 .build();
2689 let mut parquet_bytes = Vec::new();
2690 let mut writer =
2691 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
2692 writer.write(&batch).unwrap();
2693 writer.close().unwrap();
2694 let file_size = parquet_bytes.len() as u64;
2695 object_store.write(file_path, parquet_bytes).await.unwrap();
2696
2697 let mut cache_metrics = MetadataCacheMetrics::default();
2698 let loader = MetadataLoader::new(object_store, file_path, file_size);
2699 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2700 assert_eq!(2, parquet_meta.num_row_groups());
2701
2702 let full_row_groups = RowGroupSelection::from_full_row_group_ids([0, 1], 3, 5);
2703 assert!(!has_row_level_selection(&full_row_groups, &parquet_meta));
2704
2705 let prefix_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![0..1, 1..2])], 3);
2706 assert!(has_row_level_selection(&prefix_selection, &parquet_meta));
2707
2708 let interior_selection = RowGroupSelection::from_row_ranges(vec![(0, vec![1..2, 2..3])], 3);
2709 assert!(has_row_level_selection(&interior_selection, &parquet_meta));
2710 }
2711
2712 fn expected_metadata_with_reused_tag_name(
2713 old_metadata: &RegionMetadata,
2714 ) -> Arc<RegionMetadata> {
2715 let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2716 builder
2717 .push_column_metadata(ColumnMetadata {
2718 column_schema: ColumnSchema::new(
2719 "tag_0".to_string(),
2720 ConcreteDataType::string_datatype(),
2721 true,
2722 ),
2723 semantic_type: SemanticType::Tag,
2724 column_id: 10,
2725 })
2726 .push_column_metadata(ColumnMetadata {
2727 column_schema: ColumnSchema::new(
2728 "tag_1".to_string(),
2729 ConcreteDataType::string_datatype(),
2730 true,
2731 ),
2732 semantic_type: SemanticType::Tag,
2733 column_id: 1,
2734 })
2735 .push_column_metadata(ColumnMetadata {
2736 column_schema: ColumnSchema::new(
2737 "field_0".to_string(),
2738 ConcreteDataType::uint64_datatype(),
2739 true,
2740 ),
2741 semantic_type: SemanticType::Field,
2742 column_id: 2,
2743 })
2744 .push_column_metadata(ColumnMetadata {
2745 column_schema: ColumnSchema::new(
2746 "ts".to_string(),
2747 ConcreteDataType::timestamp_millisecond_datatype(),
2748 false,
2749 ),
2750 semantic_type: SemanticType::Timestamp,
2751 column_id: 3,
2752 })
2753 .primary_key(vec![10, 1]);
2754
2755 Arc::new(builder.build().unwrap())
2756 }
2757
2758 #[test]
2759 fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
2760 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2761 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2762 let ctx = SimpleFilterContext::new_opt(
2763 &metadata,
2764 Some(expected_metadata.as_ref()),
2765 &col("tag_0").eq(lit("a")),
2766 )
2767 .unwrap();
2768 assert!(matches!(
2769 ctx.filter(),
2770 MaybeFilter::Matched | MaybeFilter::Pruned
2771 ));
2772 }
2773
2774 #[test]
2775 fn test_simple_filter_context_drops_mismatched_field_filter() {
2776 let (sst_metadata, latest_metadata) = mock_metadata();
2777 let ctx = SimpleFilterContext::new_opt(
2778 &sst_metadata,
2779 Some(latest_metadata.as_ref()),
2780 &col("field_0").eq(lit(1_i64)),
2781 );
2782
2783 assert!(ctx.is_none());
2784 }
2785
2786 fn mock_metadata() -> (RegionMetadataRef, RegionMetadataRef) {
2787 let region_id = RegionId::new(1, 1);
2788 let make_tag_0 = || ColumnMetadata {
2789 column_schema: ColumnSchema::new(
2790 "tag_0".to_string(),
2791 ConcreteDataType::string_datatype(),
2792 true,
2793 ),
2794 semantic_type: SemanticType::Tag,
2795 column_id: 0,
2796 };
2797 let make_ts = || ColumnMetadata {
2798 column_schema: ColumnSchema::new(
2799 "ts".to_string(),
2800 ConcreteDataType::timestamp_millisecond_datatype(),
2801 false,
2802 ),
2803 semantic_type: SemanticType::Timestamp,
2804 column_id: 2,
2805 };
2806 let make_field_0 = |data_type| ColumnMetadata {
2807 column_schema: ColumnSchema::new("field_0".to_string(), data_type, true),
2808 semantic_type: SemanticType::Field,
2809 column_id: 1,
2810 };
2811
2812 let mut sst_builder = RegionMetadataBuilder::new(region_id);
2813 sst_builder
2814 .push_column_metadata(make_tag_0())
2815 .push_column_metadata(make_field_0(ConcreteDataType::uint64_datatype()))
2816 .push_column_metadata(make_ts())
2817 .primary_key(vec![0]);
2818 let sst_metadata = Arc::new(sst_builder.build().unwrap());
2819
2820 let mut expected_builder = RegionMetadataBuilder::new(region_id);
2821 expected_builder
2822 .push_column_metadata(make_tag_0())
2823 .push_column_metadata(make_field_0(ConcreteDataType::int64_datatype()))
2824 .push_column_metadata(make_ts())
2825 .primary_key(vec![0]);
2826
2827 let expected_metadata = Arc::new(expected_builder.build().unwrap());
2828
2829 (sst_metadata, expected_metadata)
2830 }
2831
2832 #[test]
2833 fn test_physical_filter_context_skips_renamed_column() {
2834 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2835 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2836 let read_format = FlatReadFormat::new(
2837 metadata.clone(),
2838 ReadColumns::from_deduped_column_ids(
2839 metadata.column_metadatas.iter().map(|c| c.column_id),
2840 ),
2841 None,
2842 "test",
2843 true,
2844 )
2845 .unwrap();
2846
2847 let ctx = PhysicalFilterContext::new_opt(
2848 &metadata,
2849 Some(expected_metadata.as_ref()),
2850 &read_format,
2851 &col("tag_0").in_list(vec![lit("a"), lit("b")], false),
2852 );
2853
2854 assert!(ctx.is_none());
2855 }
2856
2857 #[test]
2858 fn test_physical_filter_context_only_accepts_prefilter_candidates() {
2859 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2860 let read_format = FlatReadFormat::new(
2861 metadata.clone(),
2862 ReadColumns::from_deduped_column_ids(
2863 metadata.column_metadatas.iter().map(|c| c.column_id),
2864 ),
2865 None,
2866 "test",
2867 true,
2868 )
2869 .unwrap();
2870
2871 let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
2873 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
2874
2875 let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
2877 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, ¬_in).is_some());
2878
2879 let is_null = col("tag_0").is_null();
2881 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
2882 let is_not_null = col("tag_0").is_not_null();
2883 assert!(
2884 PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
2885 );
2886
2887 let between = col("field_0").between(lit(1_u64), lit(10_u64));
2889 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
2890
2891 let binary = col("tag_0").eq(lit("a"));
2893 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
2894 }
2895
2896 fn write_test_parquet_with_pk_column(values: &[&[u8]]) -> bytes::Bytes {
2897 use datatypes::arrow::array::{
2898 BinaryArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
2899 };
2900 use datatypes::arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema, TimeUnit};
2901 use store_api::storage::consts::{
2902 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
2903 };
2904
2905 let n = values.len();
2906 let schema = Arc::new(ArrowSchema::new(vec![
2907 ArrowField::new(
2908 "ts",
2909 DataType::Timestamp(TimeUnit::Millisecond, None),
2910 false,
2911 ),
2912 ArrowField::new(PRIMARY_KEY_COLUMN_NAME, DataType::Binary, false),
2913 ArrowField::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
2914 ArrowField::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
2915 ]));
2916 let ts: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![0_i64; n]));
2917 let pk: ArrayRef = Arc::new(BinaryArray::from_iter_values(values.iter().copied()));
2918 let seq: ArrayRef = Arc::new(UInt64Array::from(vec![0_u64; n]));
2919 let op: ArrayRef = Arc::new(UInt8Array::from(vec![0_u8; n]));
2920 let batch = RecordBatch::try_new(schema.clone(), vec![ts, pk, seq, op]).unwrap();
2921
2922 let mut bytes = Vec::new();
2923 let mut writer = ArrowWriter::try_new(&mut bytes, schema, None).unwrap();
2924 writer.write(&batch).unwrap();
2925 writer.close().unwrap();
2926 bytes::Bytes::from(bytes)
2927 }
2928
2929 fn load_parquet_meta(bytes: bytes::Bytes) -> Arc<ParquetMetaData> {
2930 let builder =
2931 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
2932 builder.metadata().clone()
2933 }
2934
2935 #[test]
2936 fn test_should_read_pk_as_binary_small_chunk_returns_false() {
2937 let bytes = write_test_parquet_with_pk_column(&[b"a", b"b", b"c"]);
2938 let meta = load_parquet_meta(bytes);
2939
2940 assert!(!should_read_pk_as_binary_with_limit(&meta, 1024));
2941 }
2942
2943 #[test]
2944 fn test_should_read_pk_as_binary_large_chunk_returns_true() {
2945 let owned: Vec<Vec<u8>> = (0..512u32)
2946 .map(|i| {
2947 let mut v = vec![0u8; 16];
2948 v[..4].copy_from_slice(&i.to_le_bytes());
2949 v
2950 })
2951 .collect();
2952 let refs: Vec<&[u8]> = owned.iter().map(|v| v.as_slice()).collect();
2953 let bytes = write_test_parquet_with_pk_column(&refs);
2954 let meta = load_parquet_meta(bytes);
2955
2956 assert!(should_read_pk_as_binary_with_limit(&meta, 1024));
2957 }
2958}