1mod stream;
18
19#[cfg(feature = "vector_index")]
20use std::collections::BTreeSet;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{error, tracing, warn};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion_expr::Expr;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow::array::ArrayRef;
32use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::data_type::ConcreteDataType;
35use datatypes::prelude::DataType;
36use futures::StreamExt;
37use mito_codec::row_converter::build_primary_key_codec;
38use object_store::ObjectStore;
39use parquet::arrow::ProjectionMask;
40use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
41use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
42use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
43use partition::expr::PartitionExpr;
44use snafu::ResultExt;
45use store_api::codec::PrimaryKeyEncoding;
46use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
47use store_api::region_request::PathType;
48use store_api::storage::{ColumnId, FileId};
49use table::predicate::Predicate;
50
51use self::stream::{NestedSchemaAligner, ParquetErrorAdapter, ProjectedRecordBatchStream};
52use crate::cache::index::result_cache::PredicateKey;
53use crate::cache::{CacheStrategy, CachedSstMeta};
54#[cfg(feature = "vector_index")]
55use crate::error::ApplyVectorIndexSnafu;
56use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
57use crate::metrics::{
58 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
59 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
60};
61use crate::read::flat_projection::CompactionProjectionMapper;
62use crate::read::prune::FlatPruneReader;
63use crate::read::read_columns::ReadColumns;
64use crate::sst::file::FileHandle;
65use crate::sst::index::bloom_filter::applier::{
66 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
67};
68use crate::sst::index::fulltext_index::applier::{
69 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
70};
71use crate::sst::index::inverted_index::applier::{
72 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
73};
74#[cfg(feature = "vector_index")]
75use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
76use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
77use crate::sst::parquet::async_reader::SstAsyncFileReader;
78use crate::sst::parquet::file_range::{
79 FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
80};
81use crate::sst::parquet::flat_format::FlatReadFormat;
82use crate::sst::parquet::format::need_override_sequence;
83use crate::sst::parquet::metadata::MetadataLoader;
84use crate::sst::parquet::prefilter::{
85 PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
86};
87use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
88use crate::sst::parquet::row_group::ParquetFetchMetrics;
89use crate::sst::parquet::row_selection::RowGroupSelection;
90use crate::sst::parquet::stats::RowGroupPruningStats;
91use crate::sst::tag_maybe_to_dictionary_field;
92
93const INDEX_TYPE_FULLTEXT: &str = "fulltext";
94const INDEX_TYPE_INVERTED: &str = "inverted";
95const INDEX_TYPE_BLOOM: &str = "bloom filter";
96const INDEX_TYPE_VECTOR: &str = "vector";
97
98macro_rules! handle_index_error {
99 ($err:expr, $file_handle:expr, $index_type:expr) => {
100 if cfg!(any(test, feature = "test")) {
101 panic!(
102 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
103 $index_type,
104 $file_handle.region_id(),
105 $file_handle.file_id(),
106 $err
107 );
108 } else {
109 warn!(
110 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
111 $index_type,
112 $file_handle.region_id(),
113 $file_handle.file_id()
114 );
115 }
116 };
117}
118
119pub struct ParquetReaderBuilder {
121 table_dir: String,
123 path_type: PathType,
125 file_handle: FileHandle,
126 object_store: ObjectStore,
127 predicate: Option<Predicate>,
129 read_cols: Option<ReadColumns>,
134 cache_strategy: CacheStrategy,
136 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
138 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
139 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
140 #[cfg(feature = "vector_index")]
142 vector_index_applier: Option<VectorIndexApplierRef>,
143 #[cfg(feature = "vector_index")]
145 vector_index_k: Option<usize>,
146 expected_metadata: Option<RegionMetadataRef>,
150 compaction: bool,
152 pre_filter_mode: PreFilterMode,
154 decode_primary_key_values: bool,
156 page_index_policy: PageIndexPolicy,
157}
158
159impl ParquetReaderBuilder {
160 pub fn new(
162 table_dir: String,
163 path_type: PathType,
164 file_handle: FileHandle,
165 object_store: ObjectStore,
166 ) -> ParquetReaderBuilder {
167 ParquetReaderBuilder {
168 table_dir,
169 path_type,
170 file_handle,
171 object_store,
172 predicate: None,
173 read_cols: None,
174 cache_strategy: CacheStrategy::Disabled,
175 inverted_index_appliers: [None, None],
176 bloom_filter_index_appliers: [None, None],
177 fulltext_index_appliers: [None, None],
178 #[cfg(feature = "vector_index")]
179 vector_index_applier: None,
180 #[cfg(feature = "vector_index")]
181 vector_index_k: None,
182 expected_metadata: None,
183 compaction: false,
184 pre_filter_mode: PreFilterMode::All,
185 decode_primary_key_values: false,
186 page_index_policy: Default::default(),
187 }
188 }
189
190 #[must_use]
192 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
193 self.predicate = predicate;
194 self
195 }
196
197 #[must_use]
201 pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
202 self.read_cols = read_cols;
203 self
204 }
205
206 #[must_use]
208 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
209 self.cache_strategy = cache;
210 self
211 }
212
213 #[must_use]
215 pub(crate) fn inverted_index_appliers(
216 mut self,
217 index_appliers: [Option<InvertedIndexApplierRef>; 2],
218 ) -> Self {
219 self.inverted_index_appliers = index_appliers;
220 self
221 }
222
223 #[must_use]
225 pub(crate) fn bloom_filter_index_appliers(
226 mut self,
227 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
228 ) -> Self {
229 self.bloom_filter_index_appliers = index_appliers;
230 self
231 }
232
233 #[must_use]
235 pub(crate) fn fulltext_index_appliers(
236 mut self,
237 index_appliers: [Option<FulltextIndexApplierRef>; 2],
238 ) -> Self {
239 self.fulltext_index_appliers = index_appliers;
240 self
241 }
242
243 #[cfg(feature = "vector_index")]
245 #[must_use]
246 pub(crate) fn vector_index_applier(
247 mut self,
248 applier: Option<VectorIndexApplierRef>,
249 k: Option<usize>,
250 ) -> Self {
251 self.vector_index_applier = applier;
252 self.vector_index_k = k;
253 self
254 }
255
256 #[must_use]
258 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
259 self.expected_metadata = expected_metadata;
260 self
261 }
262
263 #[must_use]
265 pub fn compaction(mut self, compaction: bool) -> Self {
266 self.compaction = compaction;
267 self
268 }
269
270 #[must_use]
272 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
273 self.pre_filter_mode = pre_filter_mode;
274 self
275 }
276
277 #[must_use]
279 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
280 self.decode_primary_key_values = decode;
281 self
282 }
283
284 #[must_use]
285 pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
286 self.page_index_policy = page_index_policy;
287 self
288 }
289
290 #[tracing::instrument(
294 skip_all,
295 fields(
296 region_id = %self.file_handle.region_id(),
297 file_id = %self.file_handle.file_id()
298 )
299 )]
300 pub async fn build(&self) -> Result<Option<ParquetReader>> {
301 let mut metrics = ReaderMetrics::default();
302
303 let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
304 return Ok(None);
305 };
306 ParquetReader::new(Arc::new(context), selection)
307 .await
308 .map(Some)
309 }
310
311 #[tracing::instrument(
315 skip_all,
316 fields(
317 region_id = %self.file_handle.region_id(),
318 file_id = %self.file_handle.file_id()
319 )
320 )]
321 pub(crate) async fn build_reader_input(
322 &self,
323 metrics: &mut ReaderMetrics,
324 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
325 self.build_reader_input_inner(metrics).await
326 }
327
328 async fn build_reader_input_inner(
329 &self,
330 metrics: &mut ReaderMetrics,
331 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
332 let start = Instant::now();
333
334 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
335 let file_size = self.file_handle.meta_ref().file_size;
336
337 let (sst_meta, cache_miss) = self
339 .read_parquet_metadata(
340 &file_path,
341 file_size,
342 &mut metrics.metadata_cache_metrics,
343 self.page_index_policy,
344 )
345 .await?;
346 let parquet_meta = sst_meta.parquet_metadata();
347 let region_meta = sst_meta.region_metadata();
348 let region_partition_expr_str = self
349 .expected_metadata
350 .as_ref()
351 .and_then(|meta| meta.partition_expr.as_ref())
352 .map(|expr| expr.as_str());
353 let (_, is_same_region_partition) = Self::is_same_region_partition(
354 region_partition_expr_str,
355 self.file_handle.meta_ref().partition_expr.as_ref(),
356 )?;
357 let skip_auto_convert = self.compaction && is_same_region_partition;
361
362 let compaction_projection_mapper = if self.compaction
371 && !is_same_region_partition
372 && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
373 {
374 Some(CompactionProjectionMapper::try_new(®ion_meta)?)
375 } else {
376 None
377 };
378
379 let read_cols = if let Some(read_cols) = &self.read_cols {
380 read_cols.clone()
381 } else {
382 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
383 ReadColumns::from_deduped_column_ids(
385 expected_meta
386 .column_metadatas
387 .iter()
388 .map(|col| col.column_id),
389 )
390 };
391 let mut read_format = FlatReadFormat::new(
392 region_meta.clone(),
393 read_cols,
394 Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395 &file_path,
396 skip_auto_convert,
397 )?;
398 if need_override_sequence(&parquet_meta) {
399 read_format
400 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
401 }
402
403 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
405 let parquet_read_cols = read_format.parquet_read_columns();
406 let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
407 let selection = self
408 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
409 .await;
410
411 if selection.is_empty() {
412 metrics.build_cost += start.elapsed();
413 return Ok(None);
414 }
415
416 if cache_miss && !selection.is_empty() {
418 use crate::cache::file_cache::{FileType, IndexKey};
419 let index_key = IndexKey::new(
420 self.file_handle.region_id(),
421 self.file_handle.file_id().file_id(),
422 FileType::Parquet,
423 );
424 self.cache_strategy.maybe_download_background(
425 index_key,
426 file_path.clone(),
427 self.object_store.clone(),
428 file_size,
429 );
430 }
431
432 let prune_schema = self
433 .expected_metadata
434 .as_ref()
435 .map(|meta| meta.schema.clone())
436 .unwrap_or_else(|| region_meta.schema.clone());
437
438 let arrow_reader_options =
440 ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
441 let arrow_metadata =
442 ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
443 .context(ReadDataPartSnafu)?;
444
445 let dyn_filters = if let Some(predicate) = &self.predicate {
446 predicate.dyn_filters().as_ref().clone()
447 } else {
448 vec![]
449 };
450
451 let codec = build_primary_key_codec(read_format.metadata());
452
453 let filter_plan = build_reader_filter_plan(
454 self.predicate.as_ref(),
455 self.expected_metadata.as_deref(),
456 self.pre_filter_mode,
457 &read_format,
458 parquet_meta.file_metadata().schema_descr(),
459 &codec,
460 );
461
462 let output_schema = read_format.output_arrow_schema()?;
463
464 let reader_builder = RowGroupReaderBuilder {
465 file_handle: self.file_handle.clone(),
466 file_path,
467 parquet_meta,
468 arrow_metadata,
469 output_schema,
470 object_store: self.object_store.clone(),
471 projection: projection_plan,
472 has_nested_projection: parquet_read_cols.has_nested(),
473 cache_strategy: self.cache_strategy.clone(),
474 prefilter_builder: filter_plan.prefilter_builder,
475 };
476
477 let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
478
479 let context = FileRangeContext::new(
480 reader_builder,
481 RangeBase {
482 filters: filter_plan.remaining_simple_filters,
483 dyn_filters,
484 read_format,
485 expected_metadata: self.expected_metadata.clone(),
486 prune_schema,
487 codec,
488 compat_batch: None,
489 compaction_projection_mapper,
490 pre_filter_mode: self.pre_filter_mode,
491 partition_filter,
492 },
493 );
494
495 metrics.build_cost += start.elapsed();
496
497 Ok(Some((context, selection)))
498 }
499
500 fn is_same_region_partition(
501 region_partition_expr_str: Option<&str>,
502 file_partition_expr: Option<&PartitionExpr>,
503 ) -> Result<(Option<PartitionExpr>, bool)> {
504 let region_partition_expr = match region_partition_expr_str {
505 Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
506 None => None,
507 };
508
509 let is_same = region_partition_expr.as_ref() == file_partition_expr;
510 Ok((region_partition_expr, is_same))
511 }
512
513 fn build_partition_filter(
516 &self,
517 read_format: &FlatReadFormat,
518 prune_schema: &Arc<datatypes::schema::Schema>,
519 ) -> Result<Option<PartitionFilterContext>> {
520 let region_partition_expr_str = self
521 .expected_metadata
522 .as_ref()
523 .and_then(|meta| meta.partition_expr.as_ref());
524 let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
525
526 let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
527 region_partition_expr_str.map(|s| s.as_str()),
528 file_partition_expr_ref,
529 )?;
530
531 if is_same_region_partition {
532 return Ok(None);
533 }
534
535 let Some(region_partition_expr) = region_partition_expr else {
536 return Ok(None);
537 };
538
539 let mut referenced_columns = HashSet::new();
541 region_partition_expr.collect_column_names(&mut referenced_columns);
542
543 let partition_schema = Arc::new(datatypes::schema::Schema::new(
545 prune_schema
546 .column_schemas()
547 .iter()
548 .filter(|col| referenced_columns.contains(&col.name))
549 .map(|col| {
550 if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
551 && column_meta.semantic_type == SemanticType::Tag
552 && col.data_type.is_string()
553 {
554 let field = Arc::new(Field::new(
555 &col.name,
556 col.data_type.as_arrow_type(),
557 col.is_nullable(),
558 ));
559 let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
560 let mut column = col.clone();
561 column.data_type =
562 ConcreteDataType::from_arrow_type(dict_field.data_type());
563 return column;
564 }
565
566 col.clone()
567 })
568 .collect::<Vec<_>>(),
569 ));
570
571 let region_partition_physical_expr = region_partition_expr
572 .try_as_physical_expr(partition_schema.arrow_schema())
573 .context(SerializePartitionExprSnafu)?;
574
575 Ok(Some(PartitionFilterContext {
576 region_partition_physical_expr,
577 partition_schema,
578 }))
579 }
580
581 async fn read_parquet_metadata(
584 &self,
585 file_path: &str,
586 file_size: u64,
587 cache_metrics: &mut MetadataCacheMetrics,
588 page_index_policy: PageIndexPolicy,
589 ) -> Result<(Arc<CachedSstMeta>, bool)> {
590 let start = Instant::now();
591 let _t = READ_STAGE_ELAPSED
592 .with_label_values(&["read_parquet_metadata"])
593 .start_timer();
594
595 let file_id = self.file_handle.file_id();
596 if let Some(metadata) = self
598 .cache_strategy
599 .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
600 .await
601 {
602 cache_metrics.metadata_load_cost += start.elapsed();
603 return Ok((metadata, false));
604 }
605
606 let mut metadata_loader =
608 MetadataLoader::new(self.object_store.clone(), file_path, file_size);
609 metadata_loader.with_page_index_policy(page_index_policy);
610 let metadata = metadata_loader.load(cache_metrics).await?;
611
612 let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
613 self.cache_strategy
615 .put_sst_meta_data(file_id, metadata.clone());
616
617 cache_metrics.metadata_load_cost += start.elapsed();
618 Ok((metadata, true))
619 }
620
621 #[tracing::instrument(
623 skip_all,
624 fields(
625 region_id = %self.file_handle.region_id(),
626 file_id = %self.file_handle.file_id()
627 )
628 )]
629 async fn row_groups_to_read(
630 &self,
631 read_format: &FlatReadFormat,
632 parquet_meta: &ParquetMetaData,
633 metrics: &mut ReaderFilterMetrics,
634 ) -> RowGroupSelection {
635 let num_row_groups = parquet_meta.num_row_groups();
636 let num_rows = parquet_meta.file_metadata().num_rows();
637 if num_row_groups == 0 || num_rows == 0 {
638 return RowGroupSelection::default();
639 }
640
641 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
644 if row_group_size == 0 {
645 return RowGroupSelection::default();
646 }
647
648 metrics.rg_total += num_row_groups;
649 metrics.rows_total += num_rows as usize;
650
651 let skip_fields = self.pre_filter_mode.skip_fields();
653
654 let mut output = self.row_groups_by_minmax(
655 read_format,
656 parquet_meta,
657 row_group_size,
658 num_rows as usize,
659 metrics,
660 skip_fields,
661 );
662 if output.is_empty() {
663 return output;
664 }
665
666 let fulltext_filtered = self
667 .prune_row_groups_by_fulltext_index(
668 row_group_size,
669 num_row_groups,
670 &mut output,
671 metrics,
672 skip_fields,
673 )
674 .await;
675 if output.is_empty() {
676 return output;
677 }
678
679 self.prune_row_groups_by_inverted_index(
680 row_group_size,
681 num_row_groups,
682 &mut output,
683 metrics,
684 skip_fields,
685 )
686 .await;
687 if output.is_empty() {
688 return output;
689 }
690
691 self.prune_row_groups_by_bloom_filter(
692 row_group_size,
693 parquet_meta,
694 &mut output,
695 metrics,
696 skip_fields,
697 )
698 .await;
699 if output.is_empty() {
700 return output;
701 }
702
703 if !fulltext_filtered {
704 self.prune_row_groups_by_fulltext_bloom(
705 row_group_size,
706 parquet_meta,
707 &mut output,
708 metrics,
709 skip_fields,
710 )
711 .await;
712 }
713 #[cfg(feature = "vector_index")]
714 {
715 self.prune_row_groups_by_vector_index(
716 row_group_size,
717 num_row_groups,
718 &mut output,
719 metrics,
720 )
721 .await;
722 if output.is_empty() {
723 return output;
724 }
725 }
726 output
727 }
728
729 async fn prune_row_groups_by_fulltext_index(
731 &self,
732 row_group_size: usize,
733 num_row_groups: usize,
734 output: &mut RowGroupSelection,
735 metrics: &mut ReaderFilterMetrics,
736 skip_fields: bool,
737 ) -> bool {
738 if !self.file_handle.meta_ref().fulltext_index_available() {
739 return false;
740 }
741
742 let mut pruned = false;
743 let appliers = if skip_fields {
745 &self.fulltext_index_appliers[..1]
746 } else {
747 &self.fulltext_index_appliers[..]
748 };
749 for index_applier in appliers.iter().flatten() {
750 let predicate_key = index_applier.predicate_key();
751 let cached = self
753 .cache_strategy
754 .index_result_cache()
755 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
756 if let Some(result) = cached.as_ref()
757 && all_required_row_groups_searched(output, result)
758 {
759 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
760 metrics.fulltext_index_cache_hit += 1;
761 pruned = true;
762 continue;
763 }
764
765 metrics.fulltext_index_cache_miss += 1;
767 let file_size_hint = self.file_handle.meta_ref().index_file_size();
768 let apply_res = index_applier
769 .apply_fine(
770 self.file_handle.index_id(),
771 Some(file_size_hint),
772 metrics.fulltext_index_apply_metrics.as_mut(),
773 )
774 .await;
775 let selection = match apply_res {
776 Ok(Some(res)) => {
777 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
778 }
779 Ok(None) => continue,
780 Err(err) => {
781 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
782 continue;
783 }
784 };
785
786 self.apply_index_result_and_update_cache(
787 predicate_key,
788 self.file_handle.file_id().file_id(),
789 selection,
790 output,
791 metrics,
792 INDEX_TYPE_FULLTEXT,
793 );
794 pruned = true;
795 }
796 pruned
797 }
798
799 async fn prune_row_groups_by_inverted_index(
805 &self,
806 row_group_size: usize,
807 num_row_groups: usize,
808 output: &mut RowGroupSelection,
809 metrics: &mut ReaderFilterMetrics,
810 skip_fields: bool,
811 ) -> bool {
812 if !self.file_handle.meta_ref().inverted_index_available() {
813 return false;
814 }
815
816 let mut pruned = false;
817 let appliers = if skip_fields {
819 &self.inverted_index_appliers[..1]
820 } else {
821 &self.inverted_index_appliers[..]
822 };
823 for index_applier in appliers.iter().flatten() {
824 let predicate_key = index_applier.predicate_key();
825 let cached = self
827 .cache_strategy
828 .index_result_cache()
829 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
830 if let Some(result) = cached.as_ref()
831 && all_required_row_groups_searched(output, result)
832 {
833 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
834 metrics.inverted_index_cache_hit += 1;
835 pruned = true;
836 continue;
837 }
838
839 metrics.inverted_index_cache_miss += 1;
841 let file_size_hint = self.file_handle.meta_ref().index_file_size();
842 let apply_res = index_applier
843 .apply(
844 self.file_handle.index_id(),
845 Some(file_size_hint),
846 metrics.inverted_index_apply_metrics.as_mut(),
847 )
848 .await;
849 let selection = match apply_res {
850 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
851 row_group_size,
852 num_row_groups,
853 apply_output,
854 ),
855 Err(err) => {
856 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
857 continue;
858 }
859 };
860
861 self.apply_index_result_and_update_cache(
862 predicate_key,
863 self.file_handle.file_id().file_id(),
864 selection,
865 output,
866 metrics,
867 INDEX_TYPE_INVERTED,
868 );
869 pruned = true;
870 }
871 pruned
872 }
873
874 async fn prune_row_groups_by_bloom_filter(
875 &self,
876 row_group_size: usize,
877 parquet_meta: &ParquetMetaData,
878 output: &mut RowGroupSelection,
879 metrics: &mut ReaderFilterMetrics,
880 skip_fields: bool,
881 ) -> bool {
882 if !self.file_handle.meta_ref().bloom_filter_index_available() {
883 return false;
884 }
885
886 let mut pruned = false;
887 let appliers = if skip_fields {
889 &self.bloom_filter_index_appliers[..1]
890 } else {
891 &self.bloom_filter_index_appliers[..]
892 };
893 for index_applier in appliers.iter().flatten() {
894 let predicate_key = index_applier.predicate_key();
895 let cached = self
897 .cache_strategy
898 .index_result_cache()
899 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
900 if let Some(result) = cached.as_ref()
901 && all_required_row_groups_searched(output, result)
902 {
903 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
904 metrics.bloom_filter_cache_hit += 1;
905 pruned = true;
906 continue;
907 }
908
909 metrics.bloom_filter_cache_miss += 1;
911 let file_size_hint = self.file_handle.meta_ref().index_file_size();
912 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
913 (
914 rg.num_rows() as usize,
915 output.contains_non_empty_row_group(i)
917 && cached
918 .as_ref()
919 .map(|c| !c.contains_row_group(i))
920 .unwrap_or(true),
921 )
922 });
923 let apply_res = index_applier
924 .apply(
925 self.file_handle.index_id(),
926 Some(file_size_hint),
927 rgs,
928 metrics.bloom_filter_apply_metrics.as_mut(),
929 )
930 .await;
931 let mut selection = match apply_res {
932 Ok(apply_output) => {
933 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
934 }
935 Err(err) => {
936 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
937 continue;
938 }
939 };
940
941 if let Some(cached) = cached.as_ref() {
943 selection.concat(cached);
944 }
945
946 self.apply_index_result_and_update_cache(
947 predicate_key,
948 self.file_handle.file_id().file_id(),
949 selection,
950 output,
951 metrics,
952 INDEX_TYPE_BLOOM,
953 );
954 pruned = true;
955 }
956 pruned
957 }
958
959 #[cfg(feature = "vector_index")]
961 async fn prune_row_groups_by_vector_index(
962 &self,
963 row_group_size: usize,
964 num_row_groups: usize,
965 output: &mut RowGroupSelection,
966 metrics: &mut ReaderFilterMetrics,
967 ) {
968 let Some(applier) = &self.vector_index_applier else {
969 return;
970 };
971 let Some(k) = self.vector_index_k else {
972 return;
973 };
974 if !self.file_handle.meta_ref().vector_index_available() {
975 return;
976 }
977
978 let file_size_hint = self.file_handle.meta_ref().index_file_size();
979 let apply_res = applier
980 .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
981 .await;
982 let row_ids = match apply_res {
983 Ok(res) => res.row_offsets,
984 Err(err) => {
985 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
986 return;
987 }
988 };
989
990 let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
991 {
992 Ok(selection) => selection,
993 Err(err) => {
994 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
995 return;
996 }
997 };
998 metrics.rows_vector_selected += selection.row_count();
999 apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1000 }
1001
1002 async fn prune_row_groups_by_fulltext_bloom(
1003 &self,
1004 row_group_size: usize,
1005 parquet_meta: &ParquetMetaData,
1006 output: &mut RowGroupSelection,
1007 metrics: &mut ReaderFilterMetrics,
1008 skip_fields: bool,
1009 ) -> bool {
1010 if !self.file_handle.meta_ref().fulltext_index_available() {
1011 return false;
1012 }
1013
1014 let mut pruned = false;
1015 let appliers = if skip_fields {
1017 &self.fulltext_index_appliers[..1]
1018 } else {
1019 &self.fulltext_index_appliers[..]
1020 };
1021 for index_applier in appliers.iter().flatten() {
1022 let predicate_key = index_applier.predicate_key();
1023 let cached = self
1025 .cache_strategy
1026 .index_result_cache()
1027 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1028 if let Some(result) = cached.as_ref()
1029 && all_required_row_groups_searched(output, result)
1030 {
1031 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1032 metrics.fulltext_index_cache_hit += 1;
1033 pruned = true;
1034 continue;
1035 }
1036
1037 metrics.fulltext_index_cache_miss += 1;
1039 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1040 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1041 (
1042 rg.num_rows() as usize,
1043 output.contains_non_empty_row_group(i)
1045 && cached
1046 .as_ref()
1047 .map(|c| !c.contains_row_group(i))
1048 .unwrap_or(true),
1049 )
1050 });
1051 let apply_res = index_applier
1052 .apply_coarse(
1053 self.file_handle.index_id(),
1054 Some(file_size_hint),
1055 rgs,
1056 metrics.fulltext_index_apply_metrics.as_mut(),
1057 )
1058 .await;
1059 let mut selection = match apply_res {
1060 Ok(Some(apply_output)) => {
1061 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1062 }
1063 Ok(None) => continue,
1064 Err(err) => {
1065 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1066 continue;
1067 }
1068 };
1069
1070 if let Some(cached) = cached.as_ref() {
1072 selection.concat(cached);
1073 }
1074
1075 self.apply_index_result_and_update_cache(
1076 predicate_key,
1077 self.file_handle.file_id().file_id(),
1078 selection,
1079 output,
1080 metrics,
1081 INDEX_TYPE_FULLTEXT,
1082 );
1083 pruned = true;
1084 }
1085 pruned
1086 }
1087
1088 fn row_groups_by_minmax(
1090 &self,
1091 read_format: &FlatReadFormat,
1092 parquet_meta: &ParquetMetaData,
1093 row_group_size: usize,
1094 total_row_count: usize,
1095 metrics: &mut ReaderFilterMetrics,
1096 skip_fields: bool,
1097 ) -> RowGroupSelection {
1098 let Some(predicate) = &self.predicate else {
1099 return RowGroupSelection::new(row_group_size, total_row_count);
1100 };
1101
1102 let file_id = self.file_handle.file_id().file_id();
1103 let index_result_cache = self.cache_strategy.index_result_cache();
1104 let cached_minmax_key =
1105 if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1106 let mut exprs = predicate
1109 .exprs()
1110 .iter()
1111 .map(|expr| format!("{expr:?}"))
1112 .collect::<Vec<_>>();
1113 exprs.sort();
1114 let schema_version = self
1115 .expected_metadata
1116 .as_ref()
1117 .map(|meta| meta.schema_version)
1118 .unwrap_or_else(|| read_format.metadata().schema_version);
1119 Some(PredicateKey::new_minmax(
1120 Arc::new(exprs),
1121 schema_version,
1122 skip_fields,
1123 ))
1124 } else {
1125 None
1126 };
1127
1128 if let Some(index_result_cache) = index_result_cache
1129 && let Some(predicate_key) = cached_minmax_key.as_ref()
1130 {
1131 if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1132 metrics.minmax_cache_hit += 1;
1133 let num_row_groups = parquet_meta.num_row_groups();
1134 metrics.rg_minmax_filtered +=
1135 num_row_groups.saturating_sub(result.row_group_count());
1136 return (*result).clone();
1137 }
1138
1139 metrics.minmax_cache_miss += 1;
1140 }
1141
1142 let region_meta = read_format.metadata();
1143 let row_groups = parquet_meta.row_groups();
1144 let stats = RowGroupPruningStats::new(
1145 row_groups,
1146 read_format,
1147 self.expected_metadata.clone(),
1148 skip_fields,
1149 );
1150 let prune_schema = self
1151 .expected_metadata
1152 .as_ref()
1153 .map(|meta| meta.schema.arrow_schema())
1154 .unwrap_or_else(|| region_meta.schema.arrow_schema());
1155
1156 let mask = predicate.prune_with_stats(&stats, prune_schema);
1160 let output = RowGroupSelection::from_full_row_group_ids(
1161 mask.iter()
1162 .enumerate()
1163 .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1164 row_group_size,
1165 total_row_count,
1166 );
1167
1168 metrics.rg_minmax_filtered += parquet_meta
1169 .num_row_groups()
1170 .saturating_sub(output.row_group_count());
1171
1172 if let Some(index_result_cache) = index_result_cache
1173 && let Some(predicate_key) = cached_minmax_key
1174 {
1175 index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1176 }
1177
1178 output
1179 }
1180
1181 fn apply_index_result_and_update_cache(
1182 &self,
1183 predicate_key: &PredicateKey,
1184 file_id: FileId,
1185 result: RowGroupSelection,
1186 output: &mut RowGroupSelection,
1187 metrics: &mut ReaderFilterMetrics,
1188 index_type: &str,
1189 ) {
1190 apply_selection_and_update_metrics(output, &result, metrics, index_type);
1191
1192 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1193 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1194 }
1195 }
1196}
1197fn apply_selection_and_update_metrics(
1198 output: &mut RowGroupSelection,
1199 result: &RowGroupSelection,
1200 metrics: &mut ReaderFilterMetrics,
1201 index_type: &str,
1202) {
1203 let intersection = output.intersect(result);
1204
1205 let row_group_count = output.row_group_count() - intersection.row_group_count();
1206 let row_count = output.row_count() - intersection.row_count();
1207
1208 metrics.update_index_metrics(index_type, row_group_count, row_count);
1209
1210 *output = intersection;
1211}
1212
1213#[cfg(feature = "vector_index")]
1214fn vector_selection_from_offsets(
1215 row_offsets: Vec<u64>,
1216 row_group_size: usize,
1217 num_row_groups: usize,
1218) -> Result<RowGroupSelection> {
1219 let mut row_ids = BTreeSet::new();
1220 for offset in row_offsets {
1221 let row_id = u32::try_from(offset).map_err(|_| {
1222 ApplyVectorIndexSnafu {
1223 reason: format!("Row offset {} exceeds u32::MAX", offset),
1224 }
1225 .build()
1226 })?;
1227 row_ids.insert(row_id);
1228 }
1229 Ok(RowGroupSelection::from_row_ids(
1230 row_ids,
1231 row_group_size,
1232 num_row_groups,
1233 ))
1234}
1235
1236fn all_required_row_groups_searched(
1237 required_row_groups: &RowGroupSelection,
1238 cached_row_groups: &RowGroupSelection,
1239) -> bool {
1240 required_row_groups.iter().all(|(rg_id, _)| {
1241 !required_row_groups.contains_non_empty_row_group(*rg_id)
1243 || cached_row_groups.contains_row_group(*rg_id)
1245 })
1246}
1247
1248#[derive(Debug, Default, Clone)]
1250pub(crate) struct ReaderFilterMetrics {
1251 pub(crate) rg_total: usize,
1253 pub(crate) rg_fulltext_filtered: usize,
1255 pub(crate) rg_inverted_filtered: usize,
1257 pub(crate) rg_minmax_filtered: usize,
1259 pub(crate) rg_bloom_filtered: usize,
1261 pub(crate) rg_vector_filtered: usize,
1263
1264 pub(crate) rows_total: usize,
1266 pub(crate) rows_fulltext_filtered: usize,
1268 pub(crate) rows_inverted_filtered: usize,
1270 pub(crate) rows_bloom_filtered: usize,
1272 pub(crate) rows_vector_filtered: usize,
1274 pub(crate) rows_vector_selected: usize,
1276 pub(crate) rows_precise_filtered: usize,
1278
1279 pub(crate) fulltext_index_cache_hit: usize,
1281 pub(crate) fulltext_index_cache_miss: usize,
1283 pub(crate) inverted_index_cache_hit: usize,
1285 pub(crate) inverted_index_cache_miss: usize,
1287 pub(crate) bloom_filter_cache_hit: usize,
1289 pub(crate) bloom_filter_cache_miss: usize,
1291 pub(crate) minmax_cache_hit: usize,
1293 pub(crate) minmax_cache_miss: usize,
1295
1296 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1298 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1300 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1302
1303 pub(crate) pruner_cache_hit: usize,
1305 pub(crate) pruner_cache_miss: usize,
1307 pub(crate) pruner_prune_cost: Duration,
1309}
1310
1311impl ReaderFilterMetrics {
1312 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1314 self.rg_total += other.rg_total;
1315 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1316 self.rg_inverted_filtered += other.rg_inverted_filtered;
1317 self.rg_minmax_filtered += other.rg_minmax_filtered;
1318 self.rg_bloom_filtered += other.rg_bloom_filtered;
1319 self.rg_vector_filtered += other.rg_vector_filtered;
1320
1321 self.rows_total += other.rows_total;
1322 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1323 self.rows_inverted_filtered += other.rows_inverted_filtered;
1324 self.rows_bloom_filtered += other.rows_bloom_filtered;
1325 self.rows_vector_filtered += other.rows_vector_filtered;
1326 self.rows_vector_selected += other.rows_vector_selected;
1327 self.rows_precise_filtered += other.rows_precise_filtered;
1328
1329 self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1330 self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1331 self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1332 self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1333 self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1334 self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1335 self.minmax_cache_hit += other.minmax_cache_hit;
1336 self.minmax_cache_miss += other.minmax_cache_miss;
1337
1338 self.pruner_cache_hit += other.pruner_cache_hit;
1339 self.pruner_cache_miss += other.pruner_cache_miss;
1340 self.pruner_prune_cost += other.pruner_prune_cost;
1341
1342 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1344 self.inverted_index_apply_metrics
1345 .get_or_insert_with(Default::default)
1346 .merge_from(other_metrics);
1347 }
1348 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1349 self.bloom_filter_apply_metrics
1350 .get_or_insert_with(Default::default)
1351 .merge_from(other_metrics);
1352 }
1353 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1354 self.fulltext_index_apply_metrics
1355 .get_or_insert_with(Default::default)
1356 .merge_from(other_metrics);
1357 }
1358 }
1359
1360 pub(crate) fn observe(&self) {
1362 READ_ROW_GROUPS_TOTAL
1363 .with_label_values(&["before_filtering"])
1364 .inc_by(self.rg_total as u64);
1365 READ_ROW_GROUPS_TOTAL
1366 .with_label_values(&["fulltext_index_filtered"])
1367 .inc_by(self.rg_fulltext_filtered as u64);
1368 READ_ROW_GROUPS_TOTAL
1369 .with_label_values(&["inverted_index_filtered"])
1370 .inc_by(self.rg_inverted_filtered as u64);
1371 READ_ROW_GROUPS_TOTAL
1372 .with_label_values(&["minmax_index_filtered"])
1373 .inc_by(self.rg_minmax_filtered as u64);
1374 READ_ROW_GROUPS_TOTAL
1375 .with_label_values(&["bloom_filter_index_filtered"])
1376 .inc_by(self.rg_bloom_filtered as u64);
1377 READ_ROW_GROUPS_TOTAL
1378 .with_label_values(&["vector_index_filtered"])
1379 .inc_by(self.rg_vector_filtered as u64);
1380
1381 PRECISE_FILTER_ROWS_TOTAL
1382 .with_label_values(&["parquet"])
1383 .inc_by(self.rows_precise_filtered as u64);
1384 READ_ROWS_IN_ROW_GROUP_TOTAL
1385 .with_label_values(&["before_filtering"])
1386 .inc_by(self.rows_total as u64);
1387 READ_ROWS_IN_ROW_GROUP_TOTAL
1388 .with_label_values(&["fulltext_index_filtered"])
1389 .inc_by(self.rows_fulltext_filtered as u64);
1390 READ_ROWS_IN_ROW_GROUP_TOTAL
1391 .with_label_values(&["inverted_index_filtered"])
1392 .inc_by(self.rows_inverted_filtered as u64);
1393 READ_ROWS_IN_ROW_GROUP_TOTAL
1394 .with_label_values(&["bloom_filter_index_filtered"])
1395 .inc_by(self.rows_bloom_filtered as u64);
1396 READ_ROWS_IN_ROW_GROUP_TOTAL
1397 .with_label_values(&["vector_index_filtered"])
1398 .inc_by(self.rows_vector_filtered as u64);
1399 }
1400
1401 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1402 match index_type {
1403 INDEX_TYPE_FULLTEXT => {
1404 self.rg_fulltext_filtered += row_group_count;
1405 self.rows_fulltext_filtered += row_count;
1406 }
1407 INDEX_TYPE_INVERTED => {
1408 self.rg_inverted_filtered += row_group_count;
1409 self.rows_inverted_filtered += row_count;
1410 }
1411 INDEX_TYPE_BLOOM => {
1412 self.rg_bloom_filtered += row_group_count;
1413 self.rows_bloom_filtered += row_count;
1414 }
1415 INDEX_TYPE_VECTOR => {
1416 self.rg_vector_filtered += row_group_count;
1417 self.rows_vector_filtered += row_count;
1418 }
1419 _ => {}
1420 }
1421 }
1422}
1423
1424#[cfg(all(test, feature = "vector_index"))]
1425mod vector_index_tests {
1426 use super::*;
1427
1428 #[test]
1429 fn test_vector_selection_from_offsets() {
1430 let row_group_size = 4;
1431 let num_row_groups = 3;
1432 let selection =
1433 vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1434 .unwrap();
1435
1436 assert_eq!(selection.row_group_count(), 3);
1437 assert_eq!(selection.row_count(), 4);
1438 assert!(selection.contains_non_empty_row_group(0));
1439 assert!(selection.contains_non_empty_row_group(1));
1440 assert!(selection.contains_non_empty_row_group(2));
1441 }
1442
1443 #[test]
1444 fn test_vector_selection_from_offsets_out_of_range() {
1445 let row_group_size = 4;
1446 let num_row_groups = 2;
1447 let selection = vector_selection_from_offsets(
1448 vec![0, 7, u64::from(u32::MAX) + 1],
1449 row_group_size,
1450 num_row_groups,
1451 );
1452 assert!(selection.is_err());
1453 }
1454
1455 #[test]
1456 fn test_vector_selection_updates_metrics() {
1457 let row_group_size = 4;
1458 let total_rows = 8;
1459 let mut output = RowGroupSelection::new(row_group_size, total_rows);
1460 let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1461 let mut metrics = ReaderFilterMetrics::default();
1462
1463 apply_selection_and_update_metrics(
1464 &mut output,
1465 &selection,
1466 &mut metrics,
1467 INDEX_TYPE_VECTOR,
1468 );
1469
1470 assert_eq!(metrics.rg_vector_filtered, 1);
1471 assert_eq!(metrics.rows_vector_filtered, 7);
1472 assert_eq!(output.row_count(), 1);
1473 }
1474}
1475
1476#[derive(Default, Clone, Copy)]
1478pub(crate) struct MetadataCacheMetrics {
1479 pub(crate) mem_cache_hit: usize,
1481 pub(crate) file_cache_hit: usize,
1483 pub(crate) cache_miss: usize,
1485 pub(crate) metadata_load_cost: Duration,
1487 pub(crate) num_reads: usize,
1489 pub(crate) bytes_read: u64,
1491}
1492
1493impl std::fmt::Debug for MetadataCacheMetrics {
1494 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1495 let Self {
1496 mem_cache_hit,
1497 file_cache_hit,
1498 cache_miss,
1499 metadata_load_cost,
1500 num_reads,
1501 bytes_read,
1502 } = self;
1503
1504 if self.is_empty() {
1505 return write!(f, "{{}}");
1506 }
1507 write!(f, "{{")?;
1508
1509 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1510
1511 if *mem_cache_hit > 0 {
1512 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1513 }
1514 if *file_cache_hit > 0 {
1515 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1516 }
1517 if *cache_miss > 0 {
1518 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1519 }
1520 if *num_reads > 0 {
1521 write!(f, ", \"num_reads\":{}", num_reads)?;
1522 }
1523 if *bytes_read > 0 {
1524 write!(f, ", \"bytes_read\":{}", bytes_read)?;
1525 }
1526
1527 write!(f, "}}")
1528 }
1529}
1530
1531impl MetadataCacheMetrics {
1532 pub(crate) fn is_empty(&self) -> bool {
1534 self.metadata_load_cost.is_zero()
1535 }
1536
1537 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1539 self.mem_cache_hit += other.mem_cache_hit;
1540 self.file_cache_hit += other.file_cache_hit;
1541 self.cache_miss += other.cache_miss;
1542 self.metadata_load_cost += other.metadata_load_cost;
1543 self.num_reads += other.num_reads;
1544 self.bytes_read += other.bytes_read;
1545 }
1546}
1547
1548#[derive(Debug, Default, Clone)]
1550pub struct ReaderMetrics {
1551 pub(crate) filter_metrics: ReaderFilterMetrics,
1553 pub(crate) build_cost: Duration,
1555 pub(crate) scan_cost: Duration,
1557 pub(crate) num_record_batches: usize,
1559 pub(crate) num_batches: usize,
1561 pub(crate) num_rows: usize,
1563 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1565 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1567 pub(crate) metadata_mem_size: isize,
1569 pub(crate) num_range_builders: isize,
1571}
1572
1573impl ReaderMetrics {
1574 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1576 self.filter_metrics.merge_from(&other.filter_metrics);
1577 self.build_cost += other.build_cost;
1578 self.scan_cost += other.scan_cost;
1579 self.num_record_batches += other.num_record_batches;
1580 self.num_batches += other.num_batches;
1581 self.num_rows += other.num_rows;
1582 self.metadata_cache_metrics
1583 .merge_from(&other.metadata_cache_metrics);
1584 if let Some(other_fetch) = &other.fetch_metrics {
1585 if let Some(self_fetch) = &self.fetch_metrics {
1586 self_fetch.merge_from(other_fetch);
1587 } else {
1588 self.fetch_metrics = Some(other_fetch.clone());
1589 }
1590 }
1591 self.metadata_mem_size += other.metadata_mem_size;
1592 self.num_range_builders += other.num_range_builders;
1593 }
1594
1595 pub(crate) fn observe_rows(&self, read_type: &str) {
1597 READ_ROWS_TOTAL
1598 .with_label_values(&[read_type])
1599 .inc_by(self.num_rows as u64);
1600 }
1601}
1602
1603pub(crate) struct RowGroupReaderBuilder {
1605 file_handle: FileHandle,
1609 file_path: String,
1611 parquet_meta: Arc<ParquetMetaData>,
1613 arrow_metadata: ArrowReaderMetadata,
1615 output_schema: SchemaRef,
1617 object_store: ObjectStore,
1619 projection: ProjectionMaskPlan,
1621 has_nested_projection: bool,
1623 cache_strategy: CacheStrategy,
1625 prefilter_builder: Option<PrefilterContextBuilder>,
1627}
1628
1629pub(crate) struct RowGroupBuildContext<'a> {
1632 pub(crate) row_group_idx: usize,
1634 pub(crate) row_selection: Option<RowSelection>,
1636 pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1638}
1639
1640impl RowGroupReaderBuilder {
1641 pub(crate) fn file_path(&self) -> &str {
1643 &self.file_path
1644 }
1645
1646 pub(crate) fn file_handle(&self) -> &FileHandle {
1648 &self.file_handle
1649 }
1650
1651 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1652 &self.parquet_meta
1653 }
1654
1655 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1656 &self.cache_strategy
1657 }
1658
1659 pub(crate) fn has_predicate_prefilter(&self) -> bool {
1660 self.prefilter_builder.is_some()
1661 }
1662
1663 pub(crate) async fn build(
1682 &self,
1683 build_ctx: RowGroupBuildContext<'_>,
1684 ) -> Result<ProjectedRecordBatchStream> {
1685 let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1686
1687 let Some(mut prefilter_ctx) = prefilter_ctx else {
1688 let stream = self
1690 .build_with_projection(
1691 build_ctx.row_group_idx,
1692 build_ctx.row_selection,
1693 self.projection.mask.clone(),
1694 build_ctx.fetch_metrics,
1695 )
1696 .await?;
1697 return self.make_projected_stream(stream);
1698 };
1699
1700 let prefilter_start = Instant::now();
1701 let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1702 if let Some(metrics) = build_ctx.fetch_metrics {
1703 let mut data = metrics.data.lock().unwrap();
1704 data.prefilter_cost += prefilter_start.elapsed();
1705 data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1706 }
1707
1708 let refined_selection = Some(prefilter_result.refined_selection);
1709
1710 let stream = self
1711 .build_with_projection(
1712 build_ctx.row_group_idx,
1713 refined_selection,
1714 self.projection.mask.clone(),
1715 build_ctx.fetch_metrics,
1716 )
1717 .await?;
1718 self.make_projected_stream(stream)
1719 }
1720
1721 fn make_projected_stream(
1722 &self,
1723 stream: ParquetRecordBatchStream<SstAsyncFileReader>,
1724 ) -> Result<ProjectedRecordBatchStream> {
1725 let stream = ParquetErrorAdapter::new(stream, self.file_path.clone());
1726 if !self.has_nested_projection {
1727 return Ok(stream.boxed());
1728 }
1729
1730 Ok(NestedSchemaAligner::new(
1731 stream,
1732 self.projection.projected_root_presence.clone(),
1733 self.output_schema.clone(),
1734 )?
1735 .boxed())
1736 }
1737
1738 pub(crate) async fn build_with_projection(
1740 &self,
1741 row_group_idx: usize,
1742 row_selection: Option<RowSelection>,
1743 projection: ProjectionMask,
1744 fetch_metrics: Option<&ParquetFetchMetrics>,
1745 ) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
1746 let async_reader = SstAsyncFileReader::new(
1748 self.file_handle.file_id(),
1749 self.file_path.clone(),
1750 self.object_store.clone(),
1751 self.cache_strategy.clone(),
1752 self.parquet_meta.clone(),
1753 row_group_idx,
1754 )
1755 .with_fetch_metrics(fetch_metrics.cloned());
1756
1757 let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
1759 async_reader,
1760 self.arrow_metadata.clone(),
1761 );
1762 builder = builder
1763 .with_row_groups(vec![row_group_idx])
1764 .with_projection(projection)
1765 .with_batch_size(DEFAULT_READ_BATCH_SIZE);
1766
1767 if let Some(selection) = row_selection {
1768 builder = builder.with_row_selection(selection);
1769 }
1770
1771 let stream = builder.build().context(ReadParquetSnafu {
1772 path: &self.file_path,
1773 })?;
1774
1775 Ok(stream)
1776 }
1777}
1778
1779#[derive(Clone)]
1780pub(crate) enum MaybeFilter {
1782 Filter(SimpleFilterEvaluator),
1784 Matched,
1786 Pruned,
1788}
1789
1790impl MaybeFilter {
1791 pub(crate) fn as_filter(&self) -> Option<&SimpleFilterEvaluator> {
1793 match self {
1794 MaybeFilter::Filter(filter) => Some(filter),
1795 MaybeFilter::Matched | MaybeFilter::Pruned => None,
1796 }
1797 }
1798}
1799
1800#[derive(Clone)]
1801pub(crate) struct SimpleFilterContext {
1803 filter: MaybeFilter,
1805 column_id: ColumnId,
1807 semantic_type: SemanticType,
1809}
1810
1811impl SimpleFilterContext {
1812 pub(crate) fn new_opt(
1817 sst_meta: &RegionMetadataRef,
1818 expected_meta: Option<&RegionMetadata>,
1819 expr: &Expr,
1820 ) -> Option<Self> {
1821 let filter = SimpleFilterEvaluator::try_new(expr)?;
1822 let (column_metadata, maybe_filter) = match expected_meta {
1823 Some(meta) => {
1824 let column = meta.column_by_name(filter.column_name())?;
1826 match sst_meta.column_by_id(column.column_id) {
1829 Some(sst_column) => {
1830 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1831
1832 (column, MaybeFilter::Filter(filter))
1833 }
1834 None => {
1835 if pruned_by_default(&filter, column)? {
1839 (column, MaybeFilter::Pruned)
1840 } else {
1841 (column, MaybeFilter::Matched)
1842 }
1843 }
1844 }
1845 }
1846 None => {
1847 let column = sst_meta.column_by_name(filter.column_name())?;
1848 (column, MaybeFilter::Filter(filter))
1849 }
1850 };
1851
1852 Some(Self {
1853 filter: maybe_filter,
1854 column_id: column_metadata.column_id,
1855 semantic_type: column_metadata.semantic_type,
1856 })
1857 }
1858
1859 pub(crate) fn filter(&self) -> &MaybeFilter {
1861 &self.filter
1862 }
1863
1864 pub(crate) fn column_id(&self) -> ColumnId {
1866 self.column_id
1867 }
1868
1869 pub(crate) fn semantic_type(&self) -> SemanticType {
1871 self.semantic_type
1872 }
1873}
1874
1875#[derive(Clone)]
1877pub(crate) struct PhysicalFilterContext {
1878 filter: Arc<dyn PhysicalExpr>,
1880 column_id: ColumnId,
1882 column_name: String,
1884 semantic_type: SemanticType,
1886 schema: SchemaRef,
1888}
1889
1890impl PhysicalFilterContext {
1891 pub(crate) fn new_opt(
1896 sst_meta: &RegionMetadataRef,
1897 expected_meta: Option<&RegionMetadata>,
1898 read_format: &FlatReadFormat,
1899 expr: &Expr,
1900 ) -> Option<Self> {
1901 if !Self::is_prefilter_candidate(expr) {
1902 return None;
1903 }
1904 let column_name = Self::single_column_name(expr)?;
1905 let column_metadata = match expected_meta {
1906 Some(meta) => {
1907 let column = meta.column_by_name(&column_name)?;
1908 let sst_column = sst_meta.column_by_id(column.column_id)?;
1909 if sst_column.column_schema.name != column_name {
1911 return None;
1912 }
1913 column
1914 }
1915 None => sst_meta.column_by_name(&column_name)?,
1916 };
1917
1918 let (_, field) = read_format.arrow_schema().column_with_name(&column_name)?;
1921 let field = field.clone();
1922 let schema = Arc::new(ArrowSchema::new(vec![field]));
1923 let physical_expr = Predicate::to_physical_expr(expr, &schema)
1924 .inspect_err(|e| {
1925 error!(e; "Unable to build physical filter for {expr}, schema: {schema:?}");
1926 })
1927 .ok()?;
1928
1929 Some(Self {
1930 filter: physical_expr,
1931 column_id: column_metadata.column_id,
1932 column_name,
1933 semantic_type: column_metadata.semantic_type,
1934 schema,
1935 })
1936 }
1937
1938 fn is_prefilter_candidate(expr: &Expr) -> bool {
1943 matches!(
1944 expr,
1945 Expr::InList(_) | Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Between(_)
1946 )
1947 }
1948
1949 fn single_column_name(expr: &Expr) -> Option<String> {
1950 let mut columns = HashSet::new();
1951 if expr_to_columns(expr, &mut columns).is_err() {
1952 return None;
1953 }
1954 if columns.len() != 1 {
1955 return None;
1956 }
1957 columns.iter().next().map(|column| column.name.clone())
1958 }
1959
1960 pub(crate) fn filter(&self) -> &Arc<dyn PhysicalExpr> {
1962 &self.filter
1963 }
1964
1965 pub(crate) fn column_id(&self) -> ColumnId {
1967 self.column_id
1968 }
1969
1970 pub(crate) fn column_name(&self) -> &str {
1972 &self.column_name
1973 }
1974
1975 pub(crate) fn semantic_type(&self) -> SemanticType {
1977 self.semantic_type
1978 }
1979
1980 pub(crate) fn schema(&self) -> &SchemaRef {
1982 &self.schema
1983 }
1984}
1985
1986fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1989 let value = column.column_schema.create_default().ok().flatten()?;
1990 let scalar_value = value
1991 .try_to_scalar_value(&column.column_schema.data_type)
1992 .ok()?;
1993 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1994 Some(!matches)
1995}
1996
1997pub struct ParquetReader {
1999 context: FileRangeContextRef,
2001 selection: RowGroupSelection,
2003 reader: Option<FlatPruneReader>,
2005 fetch_metrics: ParquetFetchMetrics,
2007}
2008
2009impl ParquetReader {
2010 #[tracing::instrument(
2011 skip_all,
2012 fields(
2013 region_id = %self.context.reader_builder().file_handle.region_id(),
2014 file_id = %self.context.reader_builder().file_handle.file_id()
2015 )
2016 )]
2017 pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2018 loop {
2019 if let Some(reader) = &mut self.reader {
2020 if let Some(batch) = reader.next_batch().await? {
2021 return Ok(Some(batch));
2022 }
2023 self.reader = None;
2024 continue;
2025 }
2026
2027 let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
2028 return Ok(None);
2029 };
2030
2031 let skip_fields = self.context.pre_filter_mode().skip_fields();
2032 let parquet_reader = self
2033 .context
2034 .reader_builder()
2035 .build(self.context.build_context(
2036 row_group_idx,
2037 Some(row_selection),
2038 Some(&self.fetch_metrics),
2039 ))
2040 .await?;
2041 self.reader = Some(FlatPruneReader::new_with_row_group_reader(
2042 self.context.clone(),
2043 FlatRowGroupReader::new(self.context.clone(), parquet_reader),
2044 skip_fields,
2045 ));
2046 }
2047 }
2048 #[tracing::instrument(
2050 skip_all,
2051 fields(
2052 region_id = %context.reader_builder().file_handle.region_id(),
2053 file_id = %context.reader_builder().file_handle.file_id()
2054 )
2055 )]
2056 pub(crate) async fn new(
2057 context: FileRangeContextRef,
2058 mut selection: RowGroupSelection,
2059 ) -> Result<Self> {
2060 let fetch_metrics = ParquetFetchMetrics::default();
2061 let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
2062 let skip_fields = context.pre_filter_mode().skip_fields();
2063 let parquet_reader = context
2064 .reader_builder()
2065 .build(context.build_context(
2066 row_group_idx,
2067 Some(row_selection),
2068 Some(&fetch_metrics),
2069 ))
2070 .await?;
2071 Some(FlatPruneReader::new_with_row_group_reader(
2072 context.clone(),
2073 FlatRowGroupReader::new(context.clone(), parquet_reader),
2074 skip_fields,
2075 ))
2076 } else {
2077 None
2078 };
2079
2080 Ok(ParquetReader {
2081 context,
2082 selection,
2083 reader,
2084 fetch_metrics,
2085 })
2086 }
2087
2088 pub fn metadata(&self) -> &RegionMetadataRef {
2090 self.context.read_format().metadata()
2091 }
2092
2093 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2094 self.context.reader_builder().parquet_meta.clone()
2095 }
2096}
2097
2098pub(crate) struct FlatRowGroupReader {
2100 context: FileRangeContextRef,
2102 stream: ProjectedRecordBatchStream,
2104 override_sequence: Option<ArrayRef>,
2106}
2107
2108impl FlatRowGroupReader {
2109 pub(crate) fn new(context: FileRangeContextRef, stream: ProjectedRecordBatchStream) -> Self {
2111 let override_sequence = context
2113 .read_format()
2114 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2115
2116 Self {
2117 context,
2118 stream,
2119 override_sequence,
2120 }
2121 }
2122
2123 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2125 match self.stream.next().await {
2126 Some(batch_result) => {
2127 let record_batch = batch_result?;
2128
2129 let record_batch = self
2130 .context
2131 .read_format()
2132 .convert_batch(record_batch, self.override_sequence.as_ref())?;
2133 Ok(Some(record_batch))
2134 }
2135 None => Ok(None),
2136 }
2137 }
2138}
2139
2140#[cfg(test)]
2141mod tests {
2142 use std::any::Any;
2143 use std::fmt::{Debug, Formatter};
2144 use std::sync::{Arc, LazyLock};
2145
2146 use datafusion::arrow::datatypes::DataType;
2147 use datafusion_common::ScalarValue;
2148 use datafusion_expr::expr::ScalarFunction;
2149 use datafusion_expr::{
2150 ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2151 col, lit,
2152 };
2153 use datatypes::arrow::array::{ArrayRef, Int64Array};
2154 use datatypes::arrow::record_batch::RecordBatch;
2155 use datatypes::prelude::ConcreteDataType;
2156 use datatypes::schema::ColumnSchema;
2157 use object_store::services::Memory;
2158 use parquet::arrow::ArrowWriter;
2159 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2160 use store_api::region_request::PathType;
2161 use table::predicate::Predicate;
2162
2163 use super::*;
2164 use crate::sst::parquet::metadata::MetadataLoader;
2165 use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2166
2167 #[tokio::test(flavor = "current_thread")]
2168 async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2169 #[derive(Eq, PartialEq, Hash)]
2170 struct PanicDebugUdf;
2171
2172 impl Debug for PanicDebugUdf {
2173 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2174 panic!("minmax predicate key should not format exprs when cache is disabled");
2175 }
2176 }
2177
2178 impl ScalarUDFImpl for PanicDebugUdf {
2179 fn as_any(&self) -> &dyn Any {
2180 self
2181 }
2182
2183 fn name(&self) -> &str {
2184 "panic_debug_udf"
2185 }
2186
2187 fn signature(&self) -> &Signature {
2188 static SIGNATURE: LazyLock<Signature> =
2189 LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2190 &SIGNATURE
2191 }
2192
2193 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2194 Ok(DataType::Int64)
2195 }
2196
2197 fn invoke_with_args(
2198 &self,
2199 _args: ScalarFunctionArgs,
2200 ) -> datafusion_common::Result<ColumnarValue> {
2201 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2202 }
2203 }
2204
2205 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2206 let file_handle = sst_file_handle(0, 1);
2207 let table_dir = "test_table".to_string();
2208 let path_type = PathType::Bare;
2209 let file_path = file_handle.file_path(&table_dir, path_type);
2210
2211 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2212 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2213 let mut parquet_bytes = Vec::new();
2214 let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2215 writer.write(&batch).unwrap();
2216 writer.close().unwrap();
2217 let file_size = parquet_bytes.len() as u64;
2218 object_store.write(&file_path, parquet_bytes).await.unwrap();
2219
2220 let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2221 let read_format = FlatReadFormat::new(
2222 region_metadata.clone(),
2223 ReadColumns::from_deduped_column_ids(
2224 region_metadata
2225 .column_metadatas
2226 .iter()
2227 .map(|column| column.column_id),
2228 ),
2229 None,
2230 &file_path,
2231 false,
2232 )
2233 .unwrap();
2234
2235 let mut cache_metrics = MetadataCacheMetrics::default();
2236 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2237 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2238
2239 let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2240 let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2241 udf,
2242 vec![],
2243 ))]);
2244 let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2245 .predicate(Some(predicate))
2246 .cache(CacheStrategy::Disabled);
2247
2248 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2249 let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2250 let mut metrics = ReaderFilterMetrics::default();
2251 let selection = builder.row_groups_by_minmax(
2252 &read_format,
2253 &parquet_meta,
2254 row_group_size,
2255 total_row_count,
2256 &mut metrics,
2257 false,
2258 );
2259
2260 assert!(!selection.is_empty());
2261 }
2262
2263 fn expected_metadata_with_reused_tag_name(
2264 old_metadata: &RegionMetadata,
2265 ) -> Arc<RegionMetadata> {
2266 let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2267 builder
2268 .push_column_metadata(ColumnMetadata {
2269 column_schema: ColumnSchema::new(
2270 "tag_0".to_string(),
2271 ConcreteDataType::string_datatype(),
2272 true,
2273 ),
2274 semantic_type: SemanticType::Tag,
2275 column_id: 10,
2276 })
2277 .push_column_metadata(ColumnMetadata {
2278 column_schema: ColumnSchema::new(
2279 "tag_1".to_string(),
2280 ConcreteDataType::string_datatype(),
2281 true,
2282 ),
2283 semantic_type: SemanticType::Tag,
2284 column_id: 1,
2285 })
2286 .push_column_metadata(ColumnMetadata {
2287 column_schema: ColumnSchema::new(
2288 "field_0".to_string(),
2289 ConcreteDataType::uint64_datatype(),
2290 true,
2291 ),
2292 semantic_type: SemanticType::Field,
2293 column_id: 2,
2294 })
2295 .push_column_metadata(ColumnMetadata {
2296 column_schema: ColumnSchema::new(
2297 "ts".to_string(),
2298 ConcreteDataType::timestamp_millisecond_datatype(),
2299 false,
2300 ),
2301 semantic_type: SemanticType::Timestamp,
2302 column_id: 3,
2303 })
2304 .primary_key(vec![10, 1]);
2305
2306 Arc::new(builder.build().unwrap())
2307 }
2308
2309 #[test]
2310 fn test_simple_filter_context_uses_default_value_for_mismatched_expected_metadata() {
2311 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2312 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2313 let ctx = SimpleFilterContext::new_opt(
2314 &metadata,
2315 Some(expected_metadata.as_ref()),
2316 &col("tag_0").eq(lit("a")),
2317 )
2318 .unwrap();
2319 assert!(matches!(
2320 ctx.filter(),
2321 MaybeFilter::Matched | MaybeFilter::Pruned
2322 ));
2323 }
2324
2325 #[test]
2326 fn test_physical_filter_context_skips_renamed_column() {
2327 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2328 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2329 let read_format = FlatReadFormat::new(
2330 metadata.clone(),
2331 ReadColumns::from_deduped_column_ids(
2332 metadata.column_metadatas.iter().map(|c| c.column_id),
2333 ),
2334 None,
2335 "test",
2336 true,
2337 )
2338 .unwrap();
2339
2340 let ctx = PhysicalFilterContext::new_opt(
2341 &metadata,
2342 Some(expected_metadata.as_ref()),
2343 &read_format,
2344 &col("tag_0").in_list(vec![lit("a"), lit("b")], false),
2345 );
2346
2347 assert!(ctx.is_none());
2348 }
2349
2350 #[test]
2351 fn test_physical_filter_context_only_accepts_prefilter_candidates() {
2352 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2353 let read_format = FlatReadFormat::new(
2354 metadata.clone(),
2355 ReadColumns::from_deduped_column_ids(
2356 metadata.column_metadatas.iter().map(|c| c.column_id),
2357 ),
2358 None,
2359 "test",
2360 true,
2361 )
2362 .unwrap();
2363
2364 let in_list = col("tag_0").in_list(vec![lit("a"), lit("b")], false);
2366 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &in_list).is_some());
2367
2368 let not_in = col("tag_0").in_list(vec![lit("a"), lit("b")], true);
2370 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, ¬_in).is_some());
2371
2372 let is_null = col("tag_0").is_null();
2374 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_null).is_some());
2375 let is_not_null = col("tag_0").is_not_null();
2376 assert!(
2377 PhysicalFilterContext::new_opt(&metadata, None, &read_format, &is_not_null).is_some()
2378 );
2379
2380 let between = col("field_0").between(lit(1_u64), lit(10_u64));
2382 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &between).is_some());
2383
2384 let binary = col("tag_0").eq(lit("a"));
2386 assert!(PhysicalFilterContext::new_opt(&metadata, None, &read_format, &binary).is_none());
2387 }
2388}