1use std::collections::HashSet;
22use std::ops::{BitAnd, Range};
23use std::sync::Arc;
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::datatypes::SchemaRef;
30use datatypes::arrow::record_batch::RecordBatch;
31use futures::StreamExt;
32use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
33use parquet::arrow::ProjectionMask;
34use parquet::arrow::arrow_reader::RowSelection;
35use parquet::schema::types::SchemaDescriptor;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
40use table::predicate::Predicate;
41
42use crate::cache::PrefilterKey;
43use crate::error::{
44 ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
45 RecordBatchSnafu, Result, UnexpectedSnafu,
46};
47use crate::sst::parquet::file_range::PreFilterMode;
48use crate::sst::parquet::flat_format::FlatReadFormat;
49use crate::sst::parquet::format::PrimaryKeyArray;
50use crate::sst::parquet::reader::{
51 MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder,
52 SimpleFilterContext,
53};
54
55pub(crate) fn matching_row_ranges_by_primary_key(
56 input: &RecordBatch,
57 pk_column_index: usize,
58 pk_filter: &mut dyn PrimaryKeyFilter,
59) -> Result<Vec<Range<usize>>> {
60 let pk_dict_array = input
61 .column(pk_column_index)
62 .as_any()
63 .downcast_ref::<PrimaryKeyArray>()
64 .context(UnexpectedSnafu {
65 reason: "Primary key column is not a dictionary array",
66 })?;
67 let pk_values = pk_dict_array
68 .values()
69 .as_any()
70 .downcast_ref::<BinaryArray>()
71 .context(UnexpectedSnafu {
72 reason: "Primary key values are not binary array",
73 })?;
74 let keys = pk_dict_array.keys();
75 let key_values = keys.values();
76
77 if key_values.is_empty() {
78 return Ok(std::iter::once(0..input.num_rows()).collect());
79 }
80
81 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
82 let mut start = 0;
83 while start < key_values.len() {
84 let key = key_values[start];
85 let mut end = start + 1;
86 while end < key_values.len() && key_values[end] == key {
87 end += 1;
88 }
89
90 if pk_filter
91 .matches(pk_values.value(key as usize))
92 .context(DecodeSnafu)?
93 {
94 if let Some(last) = matched_row_ranges.last_mut()
95 && last.end == start
96 {
97 last.end = end;
98 } else {
99 matched_row_ranges.push(start..end);
100 }
101 }
102
103 start = end;
104 }
105
106 Ok(matched_row_ranges)
107}
108
109pub(crate) fn prefilter_flat_batch_by_primary_key(
112 input: RecordBatch,
113 pk_column_index: usize,
114 pk_filter: &mut dyn PrimaryKeyFilter,
115) -> Result<Option<RecordBatch>> {
116 if input.num_rows() == 0 {
117 return Ok(Some(input));
118 }
119
120 let matched_row_ranges =
121 matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
122 if matched_row_ranges.is_empty() {
123 return Ok(None);
124 }
125
126 if matched_row_ranges.len() == 1
127 && matched_row_ranges[0].start == 0
128 && matched_row_ranges[0].end == input.num_rows()
129 {
130 return Ok(Some(input));
131 }
132
133 if matched_row_ranges.len() == 1 {
134 let span = &matched_row_ranges[0];
135 return Ok(Some(input.slice(span.start, span.end - span.start)));
136 }
137
138 let mut builder = BooleanBufferBuilder::new(input.num_rows());
139 builder.append_n(input.num_rows(), false);
140 for span in matched_row_ranges {
141 for i in span {
142 builder.set_bit(i, true);
143 }
144 }
145
146 let filtered = datatypes::arrow::compute::filter_record_batch(
147 &input,
148 &BooleanArray::new(builder.finish(), None),
149 )
150 .context(ComputeArrowSnafu)?;
151 if filtered.num_rows() == 0 {
152 Ok(None)
153 } else {
154 Ok(Some(filtered))
155 }
156}
157
158pub(crate) struct CachedPrimaryKeyFilter {
159 inner: Box<dyn PrimaryKeyFilter>,
160 last_primary_key: Vec<u8>,
161 last_match: Option<bool>,
162}
163
164impl CachedPrimaryKeyFilter {
165 pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
166 Self {
167 inner,
168 last_primary_key: Vec::new(),
169 last_match: None,
170 }
171 }
172}
173
174impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
175 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
176 if let Some(last_match) = self.last_match
177 && self.last_primary_key == pk
178 {
179 return Ok(last_match);
180 }
181
182 let matched = self.inner.matches(pk)?;
183 self.last_primary_key.clear();
184 self.last_primary_key.extend_from_slice(pk);
185 self.last_match = Some(matched);
186 Ok(matched)
187 }
188}
189
190pub(crate) struct BulkFilterPlan {
196 pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
198 pub(crate) pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
201}
202
203pub(crate) struct ReaderFilterPlan {
211 pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
216 pub(crate) prefilter_builder: Option<PrefilterContextBuilder>,
220}
221
222pub(crate) fn build_bulk_filter_plan(
223 read_format: &FlatReadFormat,
224 predicate: Option<&Predicate>,
225) -> BulkFilterPlan {
226 let metadata = read_format.metadata();
227 let simple_filters: Vec<SimpleFilterContext> = predicate
230 .into_iter()
231 .flat_map(|predicate| {
232 predicate
233 .exprs()
234 .iter()
235 .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
236 })
237 .collect();
238
239 if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() {
243 return BulkFilterPlan {
244 remaining_simple_filters: simple_filters,
245 pk_filters: None,
246 };
247 }
248
249 let mut remaining_simple_filters = Vec::new();
250 let mut pk_filters = Vec::new();
251
252 for filter_ctx in simple_filters {
253 let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| {
256 (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone())
257 });
258
259 if let Some(pk_filter) = pk_filter {
260 pk_filters.push(pk_filter);
261 } else {
262 remaining_simple_filters.push(filter_ctx);
263 }
264 }
265
266 BulkFilterPlan {
267 remaining_simple_filters,
268 pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)),
269 }
270}
271
272pub(crate) fn build_reader_filter_plan(
287 predicate: Option<&Predicate>,
288 expected_metadata: Option<&RegionMetadata>,
289 pre_filter_mode: PreFilterMode,
290 read_format: &FlatReadFormat,
291 codec: &Arc<dyn PrimaryKeyCodec>,
292) -> ReaderFilterPlan {
293 let Some(predicate) = predicate else {
294 return ReaderFilterPlan {
295 remaining_simple_filters: Vec::new(),
296 prefilter_builder: None,
297 };
298 };
299
300 let metadata = read_format.metadata();
301 let mut prefilter_simple_filters = Vec::new();
302 let mut remaining_simple_filters = Vec::new();
303 let mut prefilter_physical_filters = Vec::new();
304 let mut primary_key_filters = Vec::new();
305 let mut pk_filter_contexts = Vec::new();
306
307 let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All;
311 let need_pk_prefilter = !read_format.batch_has_raw_pk_columns();
315
316 let can_direct_prefilter = |semantic_type: SemanticType| -> bool {
319 match semantic_type {
320 SemanticType::Tag => !need_pk_prefilter,
321 SemanticType::Field => field_prefilter_enabled,
322 SemanticType::Timestamp => true,
323 }
324 };
325
326 for expr in predicate.exprs() {
327 if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) {
330 let Some(filter) = filter_ctx.filter().as_filter() else {
334 remaining_simple_filters.push(filter_ctx);
335 continue;
336 };
337
338 let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type());
341 if direct_prefilter {
342 assert!(
343 read_format
344 .arrow_schema()
345 .column_with_name(filter.column_name())
346 .is_some(),
347 "Column '{}' is not present in the arrow schema {:?}",
348 filter.column_name(),
349 read_format.arrow_schema(),
350 );
351 prefilter_simple_filters.push(filter_ctx);
352 continue;
353 }
354
355 if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag {
357 primary_key_filters.push(filter.clone());
358 pk_filter_contexts.push(filter_ctx);
359 } else {
360 remaining_simple_filters.push(filter_ctx);
361 }
362 continue;
363 }
364
365 if let Some(filter) =
370 PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr)
371 && can_direct_prefilter(filter.semantic_type())
372 {
373 prefilter_physical_filters.push(filter);
374 }
375 }
376
377 let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| {
378 let mut expr_strs = pk_filter_contexts
379 .iter()
380 .map(|filter_ctx| filter_ctx.expr_str().to_string())
381 .collect::<Vec<_>>();
382 expr_strs.sort();
383 SmallVec::from_vec(expr_strs)
384 });
385 let pk_filter_exprs =
386 (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
387 let schema_version = expected_metadata
388 .map(|metadata| metadata.schema_version)
389 .unwrap_or_else(|| read_format.metadata().schema_version);
390 let prefilter_builder = PrefilterContextBuilder::new(
391 read_format,
392 codec,
393 pk_filter_exprs,
394 pk_filter_expr_strs,
395 prefilter_simple_filters.clone(),
396 prefilter_physical_filters,
397 schema_version,
398 );
399
400 if prefilter_builder.is_some() {
401 ReaderFilterPlan {
402 remaining_simple_filters,
403 prefilter_builder,
404 }
405 } else {
406 remaining_simple_filters.extend(prefilter_simple_filters);
409 remaining_simple_filters.extend(pk_filter_contexts);
410 ReaderFilterPlan {
411 remaining_simple_filters,
412 prefilter_builder: None,
413 }
414 }
415}
416
417pub(crate) struct PrefilterContext {
419 pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
421 filters: Vec<SimpleFilterContext>,
423 physical_filters: Vec<PhysicalFilterContext>,
426 schema_version: u64,
428 pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
430 arrow_schema: SchemaRef,
432}
433
434pub(crate) struct PrefilterContextBuilder {
440 pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
441 pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
442 filters: Vec<SimpleFilterContext>,
443 physical_filters: Vec<PhysicalFilterContext>,
444 codec: Arc<dyn PrimaryKeyCodec>,
445 metadata: RegionMetadataRef,
446 schema_version: u64,
447 arrow_schema: SchemaRef,
448}
449
450impl PrefilterContextBuilder {
451 pub(crate) fn new(
458 read_format: &FlatReadFormat,
459 codec: &Arc<dyn PrimaryKeyCodec>,
460 primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
461 primary_key_filter_expr_strs: Option<SmallVec<[String; 1]>>,
462 filters: Vec<SimpleFilterContext>,
463 physical_filters: Vec<PhysicalFilterContext>,
464 schema_version: u64,
465 ) -> Option<Self> {
466 let metadata = read_format.metadata();
467 let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
468 let pk_filters = (!use_raw_tag_columns)
469 .then_some(primary_key_filters)
470 .flatten()
471 .filter(|filters| !filters.is_empty());
472 let pk_filter_expr_strs = pk_filters
473 .is_some()
474 .then_some(primary_key_filter_expr_strs)
475 .flatten();
476
477 let mut prefilter_column_names = HashSet::new();
478 for filter_ctx in &filters {
479 if let MaybeFilter::Filter(filter) = filter_ctx.filter() {
480 prefilter_column_names.insert(filter.column_name().to_string());
481 }
482 }
483
484 if pk_filters.is_some() {
485 prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
486 }
487
488 for filter_ctx in &physical_filters {
489 prefilter_column_names.insert(filter_ctx.column_name().to_string());
490 }
491
492 let prefilter_count =
493 compute_projection_count(&prefilter_column_names, read_format.arrow_schema());
494
495 if prefilter_count == 0 {
496 return None;
497 }
498
499 let total_count = read_format.parquet_read_columns().root_indices().len();
500 let remaining_count = total_count.saturating_sub(prefilter_count);
501 if pk_filters.is_none() && prefilter_count >= total_count {
502 return None;
503 }
504
505 if pk_filters.is_none()
506 && !should_use_prefilter(prefilter_count, remaining_count, total_count)
507 {
508 return None;
509 }
510
511 Some(Self {
512 pk_filters,
513 pk_filter_expr_strs,
514 filters,
515 physical_filters,
516 codec: Arc::clone(codec),
517 metadata: metadata.clone(),
518 schema_version,
519 arrow_schema: read_format.arrow_schema().clone(),
520 })
521 }
522
523 pub(crate) fn build(&self) -> PrefilterContext {
525 let pk_filter = self.pk_filters.as_ref().map(|pk_filters| {
526 let pk_filter = self
527 .codec
528 .primary_key_filter(&self.metadata, Arc::clone(pk_filters));
529 Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
530 });
531 PrefilterContext {
532 pk_filter,
533 filters: self.filters.clone(),
534 physical_filters: self.physical_filters.clone(),
535 schema_version: self.schema_version,
536 pk_filter_expr_strs: self.pk_filter_expr_strs.clone(),
537 arrow_schema: self.arrow_schema.clone(),
538 }
539 }
540}
541
542const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5;
543const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2;
544
545pub(crate) struct PrefilterResult {
547 pub(crate) refined_selection: RowSelection,
549 pub(crate) filtered_rows: usize,
551}
552
553fn compute_projection_mask(
558 column_names: &HashSet<String>,
559 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
560 parquet_schema: &SchemaDescriptor,
561) -> ProjectionMask {
562 ProjectionMask::roots(
563 parquet_schema,
564 projection_indices(column_names, arrow_schema),
565 )
566}
567
568fn compute_projection_count(
569 column_names: &HashSet<String>,
570 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
571) -> usize {
572 projection_indices(column_names, arrow_schema).len()
573}
574
575fn projection_indices(
576 column_names: &HashSet<String>,
577 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
578) -> Vec<usize> {
579 let mut projection_indices: Vec<usize> = column_names
580 .iter()
581 .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
582 .collect();
583 projection_indices.sort_unstable();
584 projection_indices.dedup();
585 projection_indices
586}
587
588fn should_use_prefilter(
589 prefilter_count: usize,
590 remaining_count: usize,
591 total_count: usize,
592) -> bool {
593 if remaining_count == 0 {
594 return false;
595 }
596
597 if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS {
598 return false;
599 }
600
601 let ratio = prefilter_count as f64 / total_count as f64;
602 ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD
603}
604
605pub(crate) async fn execute_prefilter(
606 prefilter_ctx: &mut PrefilterContext,
607 reader_builder: &RowGroupReaderBuilder,
608 build_ctx: &RowGroupBuildContext<'_>,
609) -> Result<PrefilterResult> {
610 let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx);
611
612 if entries.is_empty() {
613 return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx)
614 .await;
615 }
616
617 execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await
618}
619
620async fn execute_prefilter_with_result_cache(
621 prefilter_ctx: &mut PrefilterContext,
622 reader_builder: &RowGroupReaderBuilder,
623 build_ctx: &RowGroupBuildContext<'_>,
624 entries: Vec<PrefilterEntry>,
625) -> Result<PrefilterResult> {
626 let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx);
627 let mut hit_mask: Option<BooleanBuffer> = None;
628 let mut misses = Vec::new();
629 for entry in entries {
630 let Some(key) = &entry.key else {
631 misses.push(entry);
632 continue;
633 };
634
635 if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) {
636 hit_mask = Some(match hit_mask {
637 Some(hit_mask) => hit_mask.bitand(mask.as_ref()),
638 None => mask.as_ref().clone(),
639 });
640 } else {
641 misses.push(entry);
642 }
643 }
644
645 if misses.is_empty() && non_cacheable_physical.is_empty() {
646 let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0));
647 let refined_selection =
648 refined_selection_from_mask(&combined_mask, &build_ctx.row_selection);
649 let rows_before_filter = rows_before_filter(reader_builder, build_ctx);
650 let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count());
651 return Ok(PrefilterResult {
652 refined_selection,
653 filtered_rows,
654 });
655 }
656
657 let mut uncached_entries = misses;
658 uncached_entries.extend(
659 non_cacheable_physical
660 .iter()
661 .copied()
662 .map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
663 );
664 let (uncached_mask, read_rows) =
665 build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?;
666
667 let final_mask = match (hit_mask, uncached_mask) {
668 (Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask),
669 (Some(hit_mask), None) => hit_mask,
670 (None, Some(uncached_mask)) => uncached_mask,
671 (None, None) => BooleanBuffer::new_set(read_rows),
672 };
673 debug_assert_eq!(final_mask.len(), read_rows);
674 let rows_selected = final_mask.count_set_bits();
675 let filtered_rows = read_rows.saturating_sub(rows_selected);
676 let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
677
678 Ok(PrefilterResult {
679 refined_selection,
680 filtered_rows,
681 })
682}
683
684fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec<usize> {
685 prefilter_ctx
686 .physical_filters
687 .iter()
688 .enumerate()
689 .filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx))
690 .collect()
691}
692
693async fn build_prefilter_masks(
694 prefilter_ctx: &mut PrefilterContext,
695 reader_builder: &RowGroupReaderBuilder,
696 build_ctx: &RowGroupBuildContext<'_>,
697 entries: &[PrefilterEntry],
698) -> Result<(Option<BooleanBuffer>, usize)> {
699 let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries);
700 let parquet_schema = reader_builder
701 .parquet_metadata()
702 .file_metadata()
703 .schema_descr();
704 let projection = compute_projection_mask(
705 &prefilter_column_names,
706 &prefilter_ctx.arrow_schema,
707 parquet_schema,
708 );
709
710 let mut stream = reader_builder
711 .build_with_projection(
712 build_ctx.row_group_idx,
713 build_ctx.row_selection.clone(),
714 projection,
715 build_ctx.fetch_metrics,
716 )
717 .await?;
718
719 let mut cache_builders = entries
720 .iter()
721 .map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0)))
722 .collect::<Vec<_>>();
723 let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0));
724 let mut rows_before_filter = 0usize;
725
726 while let Some(batch_result) = stream.next().await {
727 let batch = batch_result?;
728 let num_rows = batch.num_rows();
729 if num_rows == 0 {
730 continue;
731 }
732 rows_before_filter += num_rows;
733
734 let mut batch_mask = BooleanBuffer::new_set(num_rows);
735 for (idx, entry) in entries.iter().enumerate() {
736 let mask = eval_entry_mask(
737 &batch,
738 prefilter_ctx,
739 entry.kind,
740 reader_builder.file_path(),
741 )?;
742 batch_mask = batch_mask.bitand(&mask);
743 if let Some(Some(builder)) = cache_builders.get_mut(idx) {
744 builder.append_buffer(&mask);
745 }
746 }
747 if let Some(builder) = &mut combined_builder {
748 builder.append_buffer(&batch_mask);
749 }
750 }
751
752 for (entry, builder) in entries.iter().zip(cache_builders) {
753 if let (Some(key), Some(mut builder)) = (&entry.key, builder) {
754 reader_builder
755 .cache_strategy()
756 .put_prefilter_result(key.clone(), Arc::new(builder.finish()));
757 }
758 }
759
760 Ok((
761 combined_builder.map(|mut builder| builder.finish()),
762 rows_before_filter,
763 ))
764}
765
766fn prefilter_column_names_for_entries(
767 prefilter_ctx: &PrefilterContext,
768 entries: &[PrefilterEntry],
769) -> HashSet<String> {
770 let mut prefilter_column_names = HashSet::new();
771 for entry in entries {
772 match entry.kind {
773 PrefilterEntryKind::Simple(idx) => {
774 if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() {
775 prefilter_column_names.insert(filter.column_name().to_string());
776 }
777 }
778 PrefilterEntryKind::Physical(idx) => {
779 prefilter_column_names.insert(
780 prefilter_ctx.physical_filters[idx]
781 .column_name()
782 .to_string(),
783 );
784 }
785 PrefilterEntryKind::PkGroup => {
786 prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
787 }
788 }
789 }
790 prefilter_column_names
791}
792
793async fn execute_prefilter_by_reading_columns(
794 prefilter_ctx: &mut PrefilterContext,
795 reader_builder: &RowGroupReaderBuilder,
796 build_ctx: &RowGroupBuildContext<'_>,
797) -> Result<PrefilterResult> {
798 let entries = all_prefilter_entries(prefilter_ctx);
799 let (mask, rows_before_filter) =
800 build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?;
801
802 let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter));
803 let rows_selected = final_mask.count_set_bits();
804 let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
805 let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
806
807 Ok(PrefilterResult {
808 refined_selection,
809 filtered_rows,
810 })
811}
812
813fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec<PrefilterEntry> {
814 let mut entries = Vec::new();
815 if prefilter_ctx.pk_filter.is_some() {
816 entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup));
817 }
818 entries.extend(
819 prefilter_ctx
820 .filters
821 .iter()
822 .enumerate()
823 .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))),
824 );
825 entries.extend(
826 prefilter_ctx
827 .physical_filters
828 .iter()
829 .enumerate()
830 .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
831 );
832 entries
833}
834
835#[derive(Clone, Copy)]
836enum PrefilterEntryKind {
837 Simple(usize),
838 Physical(usize),
839 PkGroup,
840}
841
842struct PrefilterEntry {
843 kind: PrefilterEntryKind,
844 key: Option<PrefilterKey>,
845}
846
847impl PrefilterEntry {
848 fn without_cache(kind: PrefilterEntryKind) -> Self {
849 Self { kind, key: None }
850 }
851}
852
853fn build_prefilter_cache_entries(
854 prefilter_ctx: &PrefilterContext,
855 reader_builder: &RowGroupReaderBuilder,
856 build_ctx: &RowGroupBuildContext<'_>,
857) -> Vec<PrefilterEntry> {
858 let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref());
859 let file_id = reader_builder.file_handle().file_id().file_id();
860 let row_group_idx = build_ctx.row_group_idx as u32;
861 let mut entries = Vec::new();
862
863 for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() {
864 entries.push(PrefilterEntry {
865 kind: PrefilterEntryKind::Simple(idx),
866 key: Some(PrefilterKey::new(
867 file_id,
868 row_group_idx,
869 row_selection.clone(),
870 prefilter_ctx.schema_version,
871 smallvec![filter_ctx.expr_str().to_string()],
872 )),
873 });
874 }
875
876 for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() {
877 if !filter_ctx.is_immutable() {
878 continue;
879 }
880 entries.push(PrefilterEntry {
881 kind: PrefilterEntryKind::Physical(idx),
882 key: Some(PrefilterKey::new(
883 file_id,
884 row_group_idx,
885 row_selection.clone(),
886 prefilter_ctx.schema_version,
887 smallvec![filter_ctx.expr_str().to_string()],
888 )),
889 });
890 }
891
892 if prefilter_ctx.pk_filter.is_some()
893 && let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs
894 {
895 entries.push(PrefilterEntry {
896 kind: PrefilterEntryKind::PkGroup,
897 key: Some(PrefilterKey::new(
898 file_id,
899 row_group_idx,
900 row_selection,
901 prefilter_ctx.schema_version,
902 exprs.clone(),
903 )),
904 });
905 }
906
907 entries
908}
909
910fn rows_before_filter(
911 reader_builder: &RowGroupReaderBuilder,
912 build_ctx: &RowGroupBuildContext<'_>,
913) -> usize {
914 build_ctx.row_selection.as_ref().map_or_else(
915 || {
916 reader_builder
917 .parquet_metadata()
918 .row_group(build_ctx.row_group_idx)
919 .num_rows() as usize
920 },
921 RowSelection::row_count,
922 )
923}
924
925fn refined_selection_from_mask(
926 mask: &BooleanBuffer,
927 original_selection: &Option<RowSelection>,
928) -> RowSelection {
929 if mask.is_empty() || mask.count_set_bits() == 0 {
930 return RowSelection::from(vec![]);
931 }
932
933 let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]);
934 match original_selection {
935 Some(original) => original.and_then(&prefilter_selection),
936 None => prefilter_selection,
937 }
938}
939
940fn eval_entry_mask(
941 batch: &RecordBatch,
942 prefilter_ctx: &mut PrefilterContext,
943 kind: PrefilterEntryKind,
944 file_path: &str,
945) -> Result<BooleanBuffer> {
946 match kind {
947 PrefilterEntryKind::Simple(idx) => {
948 eval_simple_filter_mask(batch, &prefilter_ctx.filters[idx], file_path)
949 }
950 PrefilterEntryKind::Physical(idx) => {
951 eval_physical_filter_mask(batch, &prefilter_ctx.physical_filters[idx], file_path)
952 }
953 PrefilterEntryKind::PkGroup => {
954 let pk_filter = prefilter_ctx.pk_filter.as_mut().context(UnexpectedSnafu {
955 reason: "Missing primary key filter for prefilter cache entry",
956 })?;
957 eval_pk_group_mask(batch, pk_filter.as_mut())
958 }
959 }
960}
961
962fn eval_pk_group_mask(
963 batch: &RecordBatch,
964 pk_filter: &mut dyn PrimaryKeyFilter,
965) -> Result<BooleanBuffer> {
966 let (pk_column_index, _) = batch
967 .schema()
968 .column_with_name(PRIMARY_KEY_COLUMN_NAME)
969 .context(UnexpectedSnafu {
970 reason: "Primary key column not found in prefilter batch",
971 })?;
972 let matched_row_ranges = matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter)?;
973 let mut builder = BooleanBufferBuilder::new(batch.num_rows());
974 builder.append_n(batch.num_rows(), false);
975 for range in matched_row_ranges {
976 for row in range {
977 builder.set_bit(row, true);
978 }
979 }
980 Ok(builder.finish())
981}
982
983fn eval_simple_filter_mask(
984 batch: &RecordBatch,
985 filter_ctx: &SimpleFilterContext,
986 file_path: &str,
987) -> Result<BooleanBuffer> {
988 let filter = match filter_ctx.filter() {
989 MaybeFilter::Filter(filter) => filter,
990 MaybeFilter::Matched => return Ok(BooleanBuffer::new_set(batch.num_rows())),
991 MaybeFilter::Pruned => return Ok(BooleanBuffer::new_unset(batch.num_rows())),
992 };
993
994 let (idx, _) = batch
995 .schema()
996 .column_with_name(filter.column_name())
997 .with_context(|| UnexpectedSnafu {
998 reason: format!(
999 "Prefilter column '{}' (id {}) not found in batch for file {}",
1000 filter.column_name(),
1001 filter_ctx.column_id(),
1002 file_path
1003 ),
1004 })?;
1005 let column = batch.column(idx).clone();
1006 filter.evaluate_array(&column).context(RecordBatchSnafu)
1007}
1008
1009fn eval_physical_filter_mask(
1010 batch: &RecordBatch,
1011 filter_ctx: &PhysicalFilterContext,
1012 file_path: &str,
1013) -> Result<BooleanBuffer> {
1014 let filter = filter_ctx.filter();
1015
1016 let (idx, _) = batch
1017 .schema()
1018 .column_with_name(filter_ctx.column_name())
1019 .with_context(|| UnexpectedSnafu {
1020 reason: format!(
1021 "Prefilter physical column '{}' (id {}) not found in batch for file {}",
1022 filter_ctx.column_name(),
1023 filter_ctx.column_id(),
1024 file_path
1025 ),
1026 })?;
1027 let column = batch.column(idx).clone();
1028
1029 let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
1030 .context(NewRecordBatchSnafu)?;
1031 let evaluated = filter
1032 .evaluate(&record_batch)
1033 .context(EvalPartitionFilterSnafu)?;
1034 let array = evaluated
1035 .into_array(record_batch.num_rows())
1036 .context(EvalPartitionFilterSnafu)?;
1037 let boolean_array = array
1038 .as_any()
1039 .downcast_ref::<BooleanArray>()
1040 .context(UnexpectedSnafu {
1041 reason: "Failed to downcast physical filter result to BooleanArray",
1042 })?;
1043 let mut result = boolean_array.values().clone();
1046 if let Some(nulls) = boolean_array.nulls() {
1047 result = result.bitand(nulls.inner());
1048 }
1049 Ok(result)
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use std::sync::Arc;
1055 use std::sync::atomic::{AtomicUsize, Ordering};
1056
1057 use common_recordbatch::filter::SimpleFilterEvaluator;
1058 use datafusion_expr::{col, lit};
1059 use datatypes::arrow::array::{
1060 ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
1061 };
1062 use datatypes::arrow::datatypes::{Schema, UInt32Type};
1063 use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
1064 use store_api::codec::PrimaryKeyEncoding;
1065
1066 use super::*;
1067 use crate::read::read_columns::ReadColumns;
1068 use crate::sst::internal_fields;
1069 use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
1070 use crate::test_util::sst_util::{
1071 new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata,
1072 sst_region_metadata_with_encoding,
1073 };
1074
1075 struct CountingPrimaryKeyFilter {
1076 hits: Arc<AtomicUsize>,
1077 expected: Vec<u8>,
1078 }
1079
1080 impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
1081 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
1082 self.hits.fetch_add(1, Ordering::Relaxed);
1083 Ok(pk == self.expected.as_slice())
1084 }
1085 }
1086
1087 #[test]
1088 fn test_cached_primary_key_filter_reuses_previous_result() {
1089 let expected = new_primary_key(&["a", "x"]);
1090 let hits = Arc::new(AtomicUsize::new(0));
1091 let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
1092 hits: Arc::clone(&hits),
1093 expected: expected.clone(),
1094 }));
1095
1096 assert!(filter.matches(expected.as_slice()).unwrap());
1097 assert!(filter.matches(expected.as_slice()).unwrap());
1098 assert!(
1099 !filter
1100 .matches(new_primary_key(&["b", "x"]).as_slice())
1101 .unwrap()
1102 );
1103
1104 assert_eq!(hits.load(Ordering::Relaxed), 2);
1105 }
1106
1107 fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
1108 exprs
1109 .iter()
1110 .filter_map(SimpleFilterEvaluator::try_new)
1111 .collect()
1112 }
1113
1114 fn new_simple_filter_contexts(
1115 metadata: &RegionMetadataRef,
1116 exprs: &[datafusion_expr::Expr],
1117 ) -> Vec<SimpleFilterContext> {
1118 exprs
1119 .iter()
1120 .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
1121 .collect()
1122 }
1123
1124 fn new_physical_filter_contexts(
1125 metadata: &RegionMetadataRef,
1126 read_format: &FlatReadFormat,
1127 exprs: &[datafusion_expr::Expr],
1128 ) -> Vec<PhysicalFilterContext> {
1129 exprs
1130 .iter()
1131 .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr))
1132 .collect()
1133 }
1134
1135 fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1136 assert_eq!(primary_keys.len(), field_values.len());
1137
1138 let metadata = Arc::new(sst_region_metadata());
1139 let arrow_schema = metadata.schema.arrow_schema();
1140 let field_column = arrow_schema
1141 .field(arrow_schema.index_of("field_0").unwrap())
1142 .clone();
1143 let time_index_column = arrow_schema
1144 .field(arrow_schema.index_of("ts").unwrap())
1145 .clone();
1146 let mut fields = vec![field_column, time_index_column];
1147 fields.extend(
1148 internal_fields()
1149 .into_iter()
1150 .map(|field| field.as_ref().clone()),
1151 );
1152 let schema = Arc::new(Schema::new(fields));
1153
1154 let mut dict_values = Vec::new();
1155 let mut keys = Vec::with_capacity(primary_keys.len());
1156 for pk in primary_keys {
1157 let key = dict_values
1158 .iter()
1159 .position(|existing: &&[u8]| existing == pk)
1160 .unwrap_or_else(|| {
1161 dict_values.push(*pk);
1162 dict_values.len() - 1
1163 });
1164 keys.push(key as u32);
1165 }
1166 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1167 UInt32Array::from(keys),
1168 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1169 ));
1170
1171 RecordBatch::try_new(
1172 schema,
1173 vec![
1174 Arc::new(UInt64Array::from(field_values.to_vec())),
1175 Arc::new(TimestampMillisecondArray::from_iter_values(
1176 0..primary_keys.len() as i64,
1177 )),
1178 pk_array,
1179 Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
1180 Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
1181 ],
1182 )
1183 .unwrap()
1184 }
1185
1186 fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1187 assert_eq!(primary_keys.len(), field_values.len());
1188
1189 let metadata = Arc::new(sst_region_metadata());
1190 let arrow_schema = metadata.schema.arrow_schema();
1191 let field_column = arrow_schema
1192 .field(arrow_schema.index_of("field_0").unwrap())
1193 .clone();
1194 let time_index_column = arrow_schema
1195 .field(arrow_schema.index_of("ts").unwrap())
1196 .clone();
1197 let schema = Arc::new(Schema::new(vec![
1198 field_column,
1199 time_index_column,
1200 internal_fields()[0].as_ref().clone(),
1201 ]));
1202
1203 let mut dict_values = Vec::new();
1204 let mut keys = Vec::with_capacity(primary_keys.len());
1205 for pk in primary_keys {
1206 let key = dict_values
1207 .iter()
1208 .position(|existing: &&[u8]| existing == pk)
1209 .unwrap_or_else(|| {
1210 dict_values.push(*pk);
1211 dict_values.len() - 1
1212 });
1213 keys.push(key as u32);
1214 }
1215 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1216 UInt32Array::from(keys),
1217 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1218 ));
1219
1220 RecordBatch::try_new(
1221 schema,
1222 vec![
1223 Arc::new(UInt64Array::from(field_values.to_vec())),
1224 Arc::new(TimestampMillisecondArray::from_iter_values(
1225 0..primary_keys.len() as i64,
1226 )),
1227 pk_array,
1228 ],
1229 )
1230 .unwrap()
1231 }
1232
1233 fn field_values(batch: &RecordBatch) -> Vec<u64> {
1234 batch
1235 .column(0)
1236 .as_any()
1237 .downcast_ref::<UInt64Array>()
1238 .unwrap()
1239 .values()
1240 .to_vec()
1241 }
1242
1243 fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> {
1244 filters
1245 .iter()
1246 .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name())
1247 .collect()
1248 }
1249
1250 #[test]
1251 fn test_prefilter_primary_key_drops_single_dictionary_batch() {
1252 let metadata = Arc::new(sst_region_metadata());
1253 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
1254 let mut primary_key_filter =
1255 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1256 let pk_a = new_primary_key(&["a", "x"]);
1257 let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
1258 let pk_col_idx = primary_key_column_index(batch.num_columns());
1259
1260 let filtered =
1261 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1262 .unwrap();
1263
1264 assert!(filtered.is_none());
1265 }
1266
1267 #[test]
1268 fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
1269 let metadata = Arc::new(sst_region_metadata());
1270 let filters = Arc::new(new_test_filters(&[col("tag_0")
1271 .eq(lit("a"))
1272 .or(col("tag_0").eq(lit("c")))]));
1273 let mut primary_key_filter =
1274 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1275 let pk_a = new_primary_key(&["a", "x"]);
1276 let pk_b = new_primary_key(&["b", "x"]);
1277 let pk_c = new_primary_key(&["c", "x"]);
1278 let pk_d = new_primary_key(&["d", "x"]);
1279 let batch = new_raw_batch(
1280 &[
1281 pk_a.as_slice(),
1282 pk_a.as_slice(),
1283 pk_b.as_slice(),
1284 pk_b.as_slice(),
1285 pk_c.as_slice(),
1286 pk_c.as_slice(),
1287 pk_d.as_slice(),
1288 pk_d.as_slice(),
1289 ],
1290 &[10, 11, 12, 13, 14, 15, 16, 17],
1291 );
1292 let pk_col_idx = primary_key_column_index(batch.num_columns());
1293
1294 let filtered =
1295 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1296 .unwrap()
1297 .unwrap();
1298
1299 assert_eq!(filtered.num_rows(), 4);
1300 assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
1301 }
1302
1303 #[test]
1304 fn test_prefilter_builder_returns_none_without_selected_filters() {
1305 let metadata: RegionMetadataRef =
1306 Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1307 let read_format = FlatReadFormat::new(
1308 metadata.clone(),
1309 ReadColumns::from_deduped_column_ids(
1310 metadata.column_metadatas.iter().map(|c| c.column_id),
1311 ),
1312 None,
1313 "test",
1314 false,
1315 )
1316 .unwrap();
1317 let codec = build_primary_key_codec(metadata.as_ref());
1318
1319 let builder = PrefilterContextBuilder::new(
1320 &read_format,
1321 &codec,
1322 None,
1323 None,
1324 Vec::new(),
1325 Vec::new(),
1326 metadata.schema_version,
1327 );
1328 assert!(builder.is_none());
1329 }
1330
1331 #[test]
1332 fn test_should_use_prefilter() {
1333 assert!(should_use_prefilter(1, 5, 6));
1334 assert!(!should_use_prefilter(1, 0, 1));
1335 assert!(!should_use_prefilter(1, 1, 2));
1336 assert!(!should_use_prefilter(4, 3, 7));
1337 assert!(should_use_prefilter(3, 3, 6));
1338 }
1339
1340 #[test]
1341 fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() {
1342 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1343 PrimaryKeyEncoding::Sparse,
1344 ));
1345 let legacy_read_format = FlatReadFormat::new(
1346 metadata.clone(),
1347 ReadColumns::from_deduped_column_ids(
1348 metadata.column_metadatas.iter().map(|c| c.column_id),
1349 ),
1350 None,
1351 "memtable",
1352 false,
1353 )
1354 .unwrap();
1355 assert!(!legacy_read_format.batch_has_raw_pk_columns());
1356
1357 let plan = build_bulk_filter_plan(
1358 &legacy_read_format,
1359 Some(&Predicate::new(vec![
1360 col("tag_0").eq(lit("a")),
1361 col("field_0").gt(lit(1_u64)),
1362 ])),
1363 );
1364 assert_eq!(
1365 plan.pk_filters.as_ref().map(|filters| filters.len()),
1366 Some(1)
1367 );
1368 assert_eq!(
1369 remaining_simple_filter_columns(&plan.remaining_simple_filters),
1370 vec!["field_0"]
1371 );
1372
1373 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1374 let raw_pk_read_format = FlatReadFormat::new(
1375 metadata.clone(),
1376 ReadColumns::from_deduped_column_ids(
1377 metadata.column_metadatas.iter().map(|c| c.column_id),
1378 ),
1379 None,
1380 "memtable",
1381 true,
1382 )
1383 .unwrap();
1384 assert!(raw_pk_read_format.batch_has_raw_pk_columns());
1385
1386 let tag_only_plan = build_bulk_filter_plan(
1387 &raw_pk_read_format,
1388 Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1389 );
1390 assert!(tag_only_plan.pk_filters.is_none());
1391 assert_eq!(
1392 remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters),
1393 vec!["tag_0"]
1394 );
1395
1396 let field_only_plan = build_bulk_filter_plan(
1397 &raw_pk_read_format,
1398 Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])),
1399 );
1400 assert!(field_only_plan.pk_filters.is_none());
1401 assert_eq!(
1402 remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters),
1403 vec!["field_0"]
1404 );
1405 }
1406
1407 #[test]
1408 fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() {
1409 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1410 let full_read_format = FlatReadFormat::new(
1411 metadata.clone(),
1412 ReadColumns::from_deduped_column_ids(
1413 metadata.column_metadatas.iter().map(|c| c.column_id),
1414 ),
1415 None,
1416 "test",
1417 true,
1418 )
1419 .unwrap();
1420 let codec = build_primary_key_codec(metadata.as_ref());
1421
1422 let skip_fields_plan = build_reader_filter_plan(
1423 Some(&Predicate::new(vec![
1424 col("tag_0").eq(lit("a")),
1425 col("field_0").gt(lit(1_u64)),
1426 ])),
1427 None,
1428 PreFilterMode::SkipFields,
1429 &full_read_format,
1430 &codec,
1431 );
1432 assert!(skip_fields_plan.prefilter_builder.is_some());
1433 assert_eq!(
1434 remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters),
1435 vec!["field_0"]
1436 );
1437
1438 let field_0 = metadata.column_by_name("field_0").unwrap().column_id;
1439 let ts = metadata.time_index_column().column_id;
1440 let projected_read_format = FlatReadFormat::new(
1441 metadata.clone(),
1442 ReadColumns::from_deduped_column_ids([field_0, ts]),
1443 None,
1444 "test",
1445 true,
1446 )
1447 .unwrap();
1448 let pk_prefilter_plan = build_reader_filter_plan(
1449 Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1450 None,
1451 PreFilterMode::All,
1452 &projected_read_format,
1453 &codec,
1454 );
1455 assert!(pk_prefilter_plan.prefilter_builder.is_some());
1456 assert!(pk_prefilter_plan.remaining_simple_filters.is_empty());
1457 }
1458
1459 #[test]
1460 fn test_pk_filter_expr_strings_are_stable_under_expr_order() {
1461 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1462 PrimaryKeyEncoding::Sparse,
1463 ));
1464 let read_format = FlatReadFormat::new(
1465 metadata.clone(),
1466 ReadColumns::from_deduped_column_ids(
1467 metadata.column_metadatas.iter().map(|c| c.column_id),
1468 ),
1469 None,
1470 "test",
1471 false,
1472 )
1473 .unwrap();
1474 let codec = build_primary_key_codec(metadata.as_ref());
1475
1476 let expr_a = col("tag_0").eq(lit("a"));
1477 let expr_b = col("tag_1").eq(lit("x"));
1478 let plan_ab = build_reader_filter_plan(
1479 Some(&Predicate::new(vec![expr_a.clone(), expr_b.clone()])),
1480 None,
1481 PreFilterMode::All,
1482 &read_format,
1483 &codec,
1484 );
1485 let plan_b_a = build_reader_filter_plan(
1486 Some(&Predicate::new(vec![expr_b, expr_a])),
1487 None,
1488 PreFilterMode::All,
1489 &read_format,
1490 &codec,
1491 );
1492
1493 let exprs_ab = plan_ab.prefilter_builder.unwrap().pk_filter_expr_strs;
1494 let exprs_b_a = plan_b_a.prefilter_builder.unwrap().pk_filter_expr_strs;
1495 assert!(exprs_ab.is_some());
1496 assert_eq!(exprs_ab, exprs_b_a);
1497 }
1498
1499 #[test]
1500 fn test_simple_and_physical_contexts_preserve_expr_strings() {
1501 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1502 let read_format = FlatReadFormat::new(
1503 metadata.clone(),
1504 ReadColumns::from_deduped_column_ids(
1505 metadata.column_metadatas.iter().map(|c| c.column_id),
1506 ),
1507 None,
1508 "test",
1509 true,
1510 )
1511 .unwrap();
1512
1513 let simple_expr = col("tag_0").eq(lit("a"));
1514 let simple = SimpleFilterContext::new_opt(&metadata, None, &simple_expr).unwrap();
1515 assert_eq!(simple.expr_str(), format!("{simple_expr:?}"));
1516
1517 let physical_expr = col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false);
1518 let physical =
1519 PhysicalFilterContext::new_opt(&metadata, None, &read_format, &physical_expr).unwrap();
1520 assert_eq!(physical.expr_str(), format!("{physical_expr:?}"));
1521 }
1522
1523 #[test]
1524 fn test_eval_simple_filter_mask_uses_flat_tag_columns_directly() {
1525 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1526 let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1527 let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
1528
1529 let mask = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap();
1530 assert_eq!(mask.count_set_bits(), 4);
1531 }
1532
1533 #[test]
1534 fn test_eval_simple_filter_mask_errors_on_missing_selected_column() {
1535 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1536 let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1537 let pk = new_primary_key(&["a", "x"]);
1538 let batch = new_raw_batch(&[pk.as_slice()], &[10]);
1539
1540 let err = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap_err();
1541 let err = err.to_string();
1542 assert!(err.contains("Prefilter column"));
1543 assert!(err.contains("tag_0"));
1544 }
1545
1546 #[test]
1547 fn test_eval_physical_filter_mask_evaluates_physical_filters() {
1548 let metadata: RegionMetadataRef =
1549 Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1550 let read_format = FlatReadFormat::new(
1551 metadata.clone(),
1552 ReadColumns::from_deduped_column_ids(
1553 metadata.column_metadatas.iter().map(|c| c.column_id),
1554 ),
1555 None,
1556 "test",
1557 false,
1558 )
1559 .unwrap();
1560 let expr = col("field_0").in_list(vec![lit(11_u64)], false);
1561 let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]);
1562 let pk = new_primary_key(&["a", "x"]);
1563 let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
1564
1565 let mask = eval_physical_filter_mask(&batch, &physical_filters[0], "test").unwrap();
1566 assert_eq!(mask.count_set_bits(), 1);
1567 }
1568
1569 #[test]
1570 fn test_eval_pk_group_mask_finds_pk_column_by_name() {
1571 let metadata = Arc::new(sst_region_metadata());
1572 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1573 let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1574 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters),
1575 )) as Box<dyn PrimaryKeyFilter>);
1576 let pk_a = new_primary_key(&["a", "x"]);
1577 let pk_b = new_primary_key(&["b", "x"]);
1578 let batch = new_prefilter_batch(
1579 &[
1580 pk_a.as_slice(),
1581 pk_a.as_slice(),
1582 pk_b.as_slice(),
1583 pk_b.as_slice(),
1584 ],
1585 &[10, 11, 12, 13],
1586 );
1587
1588 let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
1589
1590 assert_eq!(mask.count_set_bits(), 2);
1591 }
1592}