1use std::collections::HashSet;
22use std::ops::{BitAnd, Range};
23use std::sync::Arc;
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::datatypes::SchemaRef;
30use datatypes::arrow::record_batch::RecordBatch;
31use futures::StreamExt;
32use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
33use parquet::arrow::ProjectionMask;
34use parquet::arrow::arrow_reader::RowSelection;
35use parquet::schema::types::SchemaDescriptor;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
40use table::predicate::Predicate;
41
42use crate::cache::PrefilterKey;
43use crate::error::{
44 ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
45 RecordBatchSnafu, Result, UnexpectedSnafu,
46};
47use crate::sst::parquet::file_range::PreFilterMode;
48use crate::sst::parquet::flat_format::FlatReadFormat;
49use crate::sst::parquet::format::PrimaryKeyArray;
50use crate::sst::parquet::reader::{
51 MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder,
52 SimpleFilterContext,
53};
54
55pub(crate) fn matching_row_ranges_by_primary_key(
56 input: &RecordBatch,
57 pk_column_index: usize,
58 pk_filter: &mut dyn PrimaryKeyFilter,
59) -> Result<Vec<Range<usize>>> {
60 let pk_column = input.column(pk_column_index);
61 if let Some(pk_dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
62 matching_row_ranges_from_dict(pk_dict_array, input.num_rows(), pk_filter)
63 } else if let Some(pk_binary_array) = pk_column.as_any().downcast_ref::<BinaryArray>() {
64 matching_row_ranges_from_binary(pk_binary_array, input.num_rows(), pk_filter)
65 } else {
66 UnexpectedSnafu {
67 reason: format!(
68 "Primary key column is neither a dictionary nor a binary array, got {:?}",
69 pk_column.data_type()
70 ),
71 }
72 .fail()
73 }
74}
75
76fn push_matched_range(
79 matched_row_ranges: &mut Vec<Range<usize>>,
80 pk_filter: &mut dyn PrimaryKeyFilter,
81 pk: &[u8],
82 start: usize,
83 end: usize,
84) -> Result<()> {
85 if pk_filter.matches(pk).context(DecodeSnafu)? {
86 if let Some(last) = matched_row_ranges.last_mut()
87 && last.end == start
88 {
89 last.end = end;
90 } else {
91 matched_row_ranges.push(start..end);
92 }
93 }
94 Ok(())
95}
96
97fn matching_row_ranges_from_dict(
99 pk_dict_array: &PrimaryKeyArray,
100 num_rows: usize,
101 pk_filter: &mut dyn PrimaryKeyFilter,
102) -> Result<Vec<Range<usize>>> {
103 let pk_values = pk_dict_array
104 .values()
105 .as_any()
106 .downcast_ref::<BinaryArray>()
107 .context(UnexpectedSnafu {
108 reason: "Primary key values are not binary array",
109 })?;
110 let keys = pk_dict_array.keys();
111 let key_values = keys.values();
112
113 if key_values.is_empty() {
114 return Ok(std::iter::once(0..num_rows).collect());
115 }
116
117 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
118 let mut start = 0;
119 while start < key_values.len() {
120 let key = key_values[start];
121 let mut end = start + 1;
122 while end < key_values.len() && key_values[end] == key {
123 end += 1;
124 }
125
126 push_matched_range(
127 &mut matched_row_ranges,
128 pk_filter,
129 pk_values.value(key as usize),
130 start,
131 end,
132 )?;
133
134 start = end;
135 }
136
137 Ok(matched_row_ranges)
138}
139
140fn matching_row_ranges_from_binary(
146 pk_array: &BinaryArray,
147 num_rows: usize,
148 pk_filter: &mut dyn PrimaryKeyFilter,
149) -> Result<Vec<Range<usize>>> {
150 if pk_array.is_empty() {
151 return Ok(std::iter::once(0..num_rows).collect());
152 }
153
154 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
155 let mut start = 0;
156 while start < pk_array.len() {
157 let value = pk_array.value(start);
158 let mut end = start + 1;
159 while end < pk_array.len() && pk_array.value(end) == value {
160 end += 1;
161 }
162
163 push_matched_range(&mut matched_row_ranges, pk_filter, value, start, end)?;
164
165 start = end;
166 }
167
168 Ok(matched_row_ranges)
169}
170
171pub(crate) fn prefilter_flat_batch_by_primary_key(
174 input: RecordBatch,
175 pk_column_index: usize,
176 pk_filter: &mut dyn PrimaryKeyFilter,
177) -> Result<Option<RecordBatch>> {
178 if input.num_rows() == 0 {
179 return Ok(Some(input));
180 }
181
182 let matched_row_ranges =
183 matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
184 if matched_row_ranges.is_empty() {
185 return Ok(None);
186 }
187
188 if matched_row_ranges.len() == 1
189 && matched_row_ranges[0].start == 0
190 && matched_row_ranges[0].end == input.num_rows()
191 {
192 return Ok(Some(input));
193 }
194
195 if matched_row_ranges.len() == 1 {
196 let span = &matched_row_ranges[0];
197 return Ok(Some(input.slice(span.start, span.end - span.start)));
198 }
199
200 let mut builder = BooleanBufferBuilder::new(input.num_rows());
201 builder.append_n(input.num_rows(), false);
202 for span in matched_row_ranges {
203 for i in span {
204 builder.set_bit(i, true);
205 }
206 }
207
208 let filtered = datatypes::arrow::compute::filter_record_batch(
209 &input,
210 &BooleanArray::new(builder.finish(), None),
211 )
212 .context(ComputeArrowSnafu)?;
213 if filtered.num_rows() == 0 {
214 Ok(None)
215 } else {
216 Ok(Some(filtered))
217 }
218}
219
220pub(crate) struct CachedPrimaryKeyFilter {
221 inner: Box<dyn PrimaryKeyFilter>,
222 last_primary_key: Vec<u8>,
223 last_match: Option<bool>,
224}
225
226impl CachedPrimaryKeyFilter {
227 pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
228 Self {
229 inner,
230 last_primary_key: Vec::new(),
231 last_match: None,
232 }
233 }
234}
235
236impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
237 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
238 if let Some(last_match) = self.last_match
239 && self.last_primary_key == pk
240 {
241 return Ok(last_match);
242 }
243
244 let matched = self.inner.matches(pk)?;
245 self.last_primary_key.clear();
246 self.last_primary_key.extend_from_slice(pk);
247 self.last_match = Some(matched);
248 Ok(matched)
249 }
250}
251
252pub(crate) struct BulkFilterPlan {
258 pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
260 pub(crate) pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
263}
264
265pub(crate) struct ReaderFilterPlan {
273 pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
278 pub(crate) prefilter_builder: Option<PrefilterContextBuilder>,
282}
283
284pub(crate) fn build_bulk_filter_plan(
285 read_format: &FlatReadFormat,
286 predicate: Option<&Predicate>,
287) -> BulkFilterPlan {
288 let metadata = read_format.metadata();
289 let simple_filters: Vec<SimpleFilterContext> = predicate
292 .into_iter()
293 .flat_map(|predicate| {
294 predicate
295 .exprs()
296 .iter()
297 .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
298 })
299 .collect();
300
301 if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() {
305 return BulkFilterPlan {
306 remaining_simple_filters: simple_filters,
307 pk_filters: None,
308 };
309 }
310
311 let mut remaining_simple_filters = Vec::new();
312 let mut pk_filters = Vec::new();
313
314 for filter_ctx in simple_filters {
315 let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| {
318 (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone())
319 });
320
321 if let Some(pk_filter) = pk_filter {
322 pk_filters.push(pk_filter);
323 } else {
324 remaining_simple_filters.push(filter_ctx);
325 }
326 }
327
328 BulkFilterPlan {
329 remaining_simple_filters,
330 pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)),
331 }
332}
333
334pub(crate) fn build_reader_filter_plan(
349 predicate: Option<&Predicate>,
350 expected_metadata: Option<&RegionMetadata>,
351 pre_filter_mode: PreFilterMode,
352 read_format: &FlatReadFormat,
353 codec: &Arc<dyn PrimaryKeyCodec>,
354) -> ReaderFilterPlan {
355 let Some(predicate) = predicate else {
356 return ReaderFilterPlan {
357 remaining_simple_filters: Vec::new(),
358 prefilter_builder: None,
359 };
360 };
361
362 let metadata = read_format.metadata();
363 let mut prefilter_simple_filters = Vec::new();
364 let mut remaining_simple_filters = Vec::new();
365 let mut prefilter_physical_filters = Vec::new();
366 let mut primary_key_filters = Vec::new();
367 let mut pk_filter_contexts = Vec::new();
368
369 let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All;
373 let need_pk_prefilter = !read_format.batch_has_raw_pk_columns();
377
378 let can_direct_prefilter = |semantic_type: SemanticType| -> bool {
381 match semantic_type {
382 SemanticType::Tag => !need_pk_prefilter,
383 SemanticType::Field => field_prefilter_enabled,
384 SemanticType::Timestamp => true,
385 }
386 };
387
388 for expr in predicate.exprs() {
389 if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) {
392 let Some(filter) = filter_ctx.filter().as_filter() else {
396 remaining_simple_filters.push(filter_ctx);
397 continue;
398 };
399
400 let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type());
403 if direct_prefilter {
404 assert!(
405 read_format
406 .arrow_schema()
407 .column_with_name(filter.column_name())
408 .is_some(),
409 "Column '{}' is not present in the arrow schema {:?}",
410 filter.column_name(),
411 read_format.arrow_schema(),
412 );
413 prefilter_simple_filters.push(filter_ctx);
414 continue;
415 }
416
417 if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag {
419 primary_key_filters.push(filter.clone());
420 pk_filter_contexts.push(filter_ctx);
421 } else {
422 remaining_simple_filters.push(filter_ctx);
423 }
424 continue;
425 }
426
427 if let Some(filter) =
432 PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr)
433 && can_direct_prefilter(filter.semantic_type())
434 {
435 prefilter_physical_filters.push(filter);
436 }
437 }
438
439 let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| {
440 let mut expr_strs = pk_filter_contexts
441 .iter()
442 .map(|filter_ctx| filter_ctx.expr_str().to_string())
443 .collect::<Vec<_>>();
444 expr_strs.sort();
445 SmallVec::from_vec(expr_strs)
446 });
447 let pk_filter_exprs =
448 (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
449 let schema_version = expected_metadata
450 .map(|metadata| metadata.schema_version)
451 .unwrap_or_else(|| read_format.metadata().schema_version);
452 let prefilter_builder = PrefilterContextBuilder::new(
453 read_format,
454 codec,
455 pk_filter_exprs,
456 pk_filter_expr_strs,
457 prefilter_simple_filters.clone(),
458 prefilter_physical_filters,
459 schema_version,
460 );
461
462 if prefilter_builder.is_some() {
463 ReaderFilterPlan {
464 remaining_simple_filters,
465 prefilter_builder,
466 }
467 } else {
468 remaining_simple_filters.extend(prefilter_simple_filters);
471 remaining_simple_filters.extend(pk_filter_contexts);
472 ReaderFilterPlan {
473 remaining_simple_filters,
474 prefilter_builder: None,
475 }
476 }
477}
478
479pub(crate) struct PrefilterContext {
481 pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
483 filters: Vec<SimpleFilterContext>,
485 physical_filters: Vec<PhysicalFilterContext>,
488 schema_version: u64,
490 pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
492 arrow_schema: SchemaRef,
494}
495
496pub(crate) struct PrefilterContextBuilder {
502 pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
503 pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
504 filters: Vec<SimpleFilterContext>,
505 physical_filters: Vec<PhysicalFilterContext>,
506 codec: Arc<dyn PrimaryKeyCodec>,
507 metadata: RegionMetadataRef,
508 schema_version: u64,
509 arrow_schema: SchemaRef,
510}
511
512impl PrefilterContextBuilder {
513 pub(crate) fn new(
520 read_format: &FlatReadFormat,
521 codec: &Arc<dyn PrimaryKeyCodec>,
522 primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
523 primary_key_filter_expr_strs: Option<SmallVec<[String; 1]>>,
524 filters: Vec<SimpleFilterContext>,
525 physical_filters: Vec<PhysicalFilterContext>,
526 schema_version: u64,
527 ) -> Option<Self> {
528 let metadata = read_format.metadata();
529 let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
530 let pk_filters = (!use_raw_tag_columns)
531 .then_some(primary_key_filters)
532 .flatten()
533 .filter(|filters| !filters.is_empty());
534 let pk_filter_expr_strs = pk_filters
535 .is_some()
536 .then_some(primary_key_filter_expr_strs)
537 .flatten();
538
539 let mut prefilter_column_names = HashSet::new();
540 for filter_ctx in &filters {
541 if let MaybeFilter::Filter(filter) = filter_ctx.filter() {
542 prefilter_column_names.insert(filter.column_name().to_string());
543 }
544 }
545
546 if pk_filters.is_some() {
547 prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
548 }
549
550 for filter_ctx in &physical_filters {
551 prefilter_column_names.insert(filter_ctx.column_name().to_string());
552 }
553
554 let prefilter_count =
555 compute_projection_count(&prefilter_column_names, read_format.arrow_schema());
556
557 if prefilter_count == 0 {
558 return None;
559 }
560
561 let total_count = read_format.parquet_read_columns().root_indices().len();
562 let remaining_count = total_count.saturating_sub(prefilter_count);
563 if pk_filters.is_none() && prefilter_count >= total_count {
564 return None;
565 }
566
567 if pk_filters.is_none()
568 && !should_use_prefilter(prefilter_count, remaining_count, total_count)
569 {
570 return None;
571 }
572
573 Some(Self {
574 pk_filters,
575 pk_filter_expr_strs,
576 filters,
577 physical_filters,
578 codec: Arc::clone(codec),
579 metadata: metadata.clone(),
580 schema_version,
581 arrow_schema: read_format.arrow_schema().clone(),
582 })
583 }
584
585 pub(crate) fn build(&self) -> PrefilterContext {
587 let pk_filter = self.pk_filters.as_ref().map(|pk_filters| {
588 let pk_filter = self
589 .codec
590 .primary_key_filter(&self.metadata, Arc::clone(pk_filters));
591 Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
592 });
593 PrefilterContext {
594 pk_filter,
595 filters: self.filters.clone(),
596 physical_filters: self.physical_filters.clone(),
597 schema_version: self.schema_version,
598 pk_filter_expr_strs: self.pk_filter_expr_strs.clone(),
599 arrow_schema: self.arrow_schema.clone(),
600 }
601 }
602}
603
604const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5;
605const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2;
606
607pub(crate) struct PrefilterResult {
609 pub(crate) refined_selection: RowSelection,
611 pub(crate) filtered_rows: usize,
613}
614
615fn compute_projection_mask(
620 column_names: &HashSet<String>,
621 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
622 parquet_schema: &SchemaDescriptor,
623) -> ProjectionMask {
624 ProjectionMask::roots(
625 parquet_schema,
626 projection_indices(column_names, arrow_schema),
627 )
628}
629
630fn compute_projection_count(
631 column_names: &HashSet<String>,
632 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
633) -> usize {
634 projection_indices(column_names, arrow_schema).len()
635}
636
637fn projection_indices(
638 column_names: &HashSet<String>,
639 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
640) -> Vec<usize> {
641 let mut projection_indices: Vec<usize> = column_names
642 .iter()
643 .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
644 .collect();
645 projection_indices.sort_unstable();
646 projection_indices.dedup();
647 projection_indices
648}
649
650fn should_use_prefilter(
651 prefilter_count: usize,
652 remaining_count: usize,
653 total_count: usize,
654) -> bool {
655 if remaining_count == 0 {
656 return false;
657 }
658
659 if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS {
660 return false;
661 }
662
663 let ratio = prefilter_count as f64 / total_count as f64;
664 ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD
665}
666
667pub(crate) async fn execute_prefilter(
668 prefilter_ctx: &mut PrefilterContext,
669 reader_builder: &RowGroupReaderBuilder,
670 build_ctx: &RowGroupBuildContext<'_>,
671) -> Result<PrefilterResult> {
672 let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx);
673
674 if entries.is_empty() {
675 return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx)
676 .await;
677 }
678
679 execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await
680}
681
682async fn execute_prefilter_with_result_cache(
683 prefilter_ctx: &mut PrefilterContext,
684 reader_builder: &RowGroupReaderBuilder,
685 build_ctx: &RowGroupBuildContext<'_>,
686 entries: Vec<PrefilterEntry>,
687) -> Result<PrefilterResult> {
688 let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx);
689 let mut hit_mask: Option<BooleanBuffer> = None;
690 let mut misses = Vec::new();
691 for entry in entries {
692 let Some(key) = &entry.key else {
693 misses.push(entry);
694 continue;
695 };
696
697 if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) {
698 hit_mask = Some(match hit_mask {
699 Some(hit_mask) => hit_mask.bitand(mask.as_ref()),
700 None => mask.as_ref().clone(),
701 });
702 } else {
703 misses.push(entry);
704 }
705 }
706
707 if misses.is_empty() && non_cacheable_physical.is_empty() {
708 let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0));
709 let refined_selection =
710 refined_selection_from_mask(&combined_mask, &build_ctx.row_selection);
711 let rows_before_filter = rows_before_filter(reader_builder, build_ctx);
712 let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count());
713 return Ok(PrefilterResult {
714 refined_selection,
715 filtered_rows,
716 });
717 }
718
719 let mut uncached_entries = misses;
720 uncached_entries.extend(
721 non_cacheable_physical
722 .iter()
723 .copied()
724 .map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
725 );
726 let (uncached_mask, read_rows) =
727 build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?;
728
729 let final_mask = match (hit_mask, uncached_mask) {
730 (Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask),
731 (Some(hit_mask), None) => hit_mask,
732 (None, Some(uncached_mask)) => uncached_mask,
733 (None, None) => BooleanBuffer::new_set(read_rows),
734 };
735 debug_assert_eq!(final_mask.len(), read_rows);
736 let rows_selected = final_mask.count_set_bits();
737 let filtered_rows = read_rows.saturating_sub(rows_selected);
738 let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
739
740 Ok(PrefilterResult {
741 refined_selection,
742 filtered_rows,
743 })
744}
745
746fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec<usize> {
747 prefilter_ctx
748 .physical_filters
749 .iter()
750 .enumerate()
751 .filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx))
752 .collect()
753}
754
755async fn build_prefilter_masks(
756 prefilter_ctx: &mut PrefilterContext,
757 reader_builder: &RowGroupReaderBuilder,
758 build_ctx: &RowGroupBuildContext<'_>,
759 entries: &[PrefilterEntry],
760) -> Result<(Option<BooleanBuffer>, usize)> {
761 let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries);
762 let parquet_schema = reader_builder
763 .parquet_metadata()
764 .file_metadata()
765 .schema_descr();
766 let projection = compute_projection_mask(
767 &prefilter_column_names,
768 &prefilter_ctx.arrow_schema,
769 parquet_schema,
770 );
771
772 let mut stream = reader_builder
773 .build_with_projection(
774 build_ctx.row_group_idx,
775 build_ctx.row_selection.clone(),
776 projection,
777 build_ctx.fetch_metrics,
778 )
779 .await?;
780
781 let mut cache_builders = entries
782 .iter()
783 .map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0)))
784 .collect::<Vec<_>>();
785 let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0));
786 let mut rows_before_filter = 0usize;
787
788 while let Some(batch_result) = stream.next().await {
789 let batch = batch_result?;
790 let num_rows = batch.num_rows();
791 if num_rows == 0 {
792 continue;
793 }
794 rows_before_filter += num_rows;
795
796 let mut batch_mask = BooleanBuffer::new_set(num_rows);
797 for (idx, entry) in entries.iter().enumerate() {
798 let mask = eval_entry_mask(
799 &batch,
800 prefilter_ctx,
801 entry.kind,
802 reader_builder.file_path(),
803 )?;
804 batch_mask = batch_mask.bitand(&mask);
805 if let Some(Some(builder)) = cache_builders.get_mut(idx) {
806 builder.append_buffer(&mask);
807 }
808 }
809 if let Some(builder) = &mut combined_builder {
810 builder.append_buffer(&batch_mask);
811 }
812 }
813
814 for (entry, builder) in entries.iter().zip(cache_builders) {
815 if let (Some(key), Some(mut builder)) = (&entry.key, builder) {
816 reader_builder
817 .cache_strategy()
818 .put_prefilter_result(key.clone(), Arc::new(builder.finish()));
819 }
820 }
821
822 Ok((
823 combined_builder.map(|mut builder| builder.finish()),
824 rows_before_filter,
825 ))
826}
827
828fn prefilter_column_names_for_entries(
829 prefilter_ctx: &PrefilterContext,
830 entries: &[PrefilterEntry],
831) -> HashSet<String> {
832 let mut prefilter_column_names = HashSet::new();
833 for entry in entries {
834 match entry.kind {
835 PrefilterEntryKind::Simple(idx) => {
836 if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() {
837 prefilter_column_names.insert(filter.column_name().to_string());
838 }
839 }
840 PrefilterEntryKind::Physical(idx) => {
841 prefilter_column_names.insert(
842 prefilter_ctx.physical_filters[idx]
843 .column_name()
844 .to_string(),
845 );
846 }
847 PrefilterEntryKind::PkGroup => {
848 prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
849 }
850 }
851 }
852 prefilter_column_names
853}
854
855async fn execute_prefilter_by_reading_columns(
856 prefilter_ctx: &mut PrefilterContext,
857 reader_builder: &RowGroupReaderBuilder,
858 build_ctx: &RowGroupBuildContext<'_>,
859) -> Result<PrefilterResult> {
860 let entries = all_prefilter_entries(prefilter_ctx);
861 let (mask, rows_before_filter) =
862 build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?;
863
864 let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter));
865 let rows_selected = final_mask.count_set_bits();
866 let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
867 let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
868
869 Ok(PrefilterResult {
870 refined_selection,
871 filtered_rows,
872 })
873}
874
875fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec<PrefilterEntry> {
876 let mut entries = Vec::new();
877 if prefilter_ctx.pk_filter.is_some() {
878 entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup));
879 }
880 entries.extend(
881 prefilter_ctx
882 .filters
883 .iter()
884 .enumerate()
885 .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))),
886 );
887 entries.extend(
888 prefilter_ctx
889 .physical_filters
890 .iter()
891 .enumerate()
892 .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
893 );
894 entries
895}
896
897#[derive(Clone, Copy)]
898enum PrefilterEntryKind {
899 Simple(usize),
900 Physical(usize),
901 PkGroup,
902}
903
904struct PrefilterEntry {
905 kind: PrefilterEntryKind,
906 key: Option<PrefilterKey>,
907}
908
909impl PrefilterEntry {
910 fn without_cache(kind: PrefilterEntryKind) -> Self {
911 Self { kind, key: None }
912 }
913}
914
915fn build_prefilter_cache_entries(
916 prefilter_ctx: &PrefilterContext,
917 reader_builder: &RowGroupReaderBuilder,
918 build_ctx: &RowGroupBuildContext<'_>,
919) -> Vec<PrefilterEntry> {
920 let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref());
921 let file_id = reader_builder.file_handle().file_id().file_id();
922 let row_group_idx = build_ctx.row_group_idx as u32;
923 let mut entries = Vec::new();
924
925 for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() {
926 entries.push(PrefilterEntry {
927 kind: PrefilterEntryKind::Simple(idx),
928 key: Some(PrefilterKey::new(
929 file_id,
930 row_group_idx,
931 row_selection.clone(),
932 prefilter_ctx.schema_version,
933 smallvec![filter_ctx.expr_str().to_string()],
934 )),
935 });
936 }
937
938 for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() {
939 if !filter_ctx.is_immutable() {
940 continue;
941 }
942 entries.push(PrefilterEntry {
943 kind: PrefilterEntryKind::Physical(idx),
944 key: Some(PrefilterKey::new(
945 file_id,
946 row_group_idx,
947 row_selection.clone(),
948 prefilter_ctx.schema_version,
949 smallvec![filter_ctx.expr_str().to_string()],
950 )),
951 });
952 }
953
954 if prefilter_ctx.pk_filter.is_some()
955 && let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs
956 {
957 entries.push(PrefilterEntry {
958 kind: PrefilterEntryKind::PkGroup,
959 key: Some(PrefilterKey::new(
960 file_id,
961 row_group_idx,
962 row_selection,
963 prefilter_ctx.schema_version,
964 exprs.clone(),
965 )),
966 });
967 }
968
969 entries
970}
971
972fn rows_before_filter(
973 reader_builder: &RowGroupReaderBuilder,
974 build_ctx: &RowGroupBuildContext<'_>,
975) -> usize {
976 build_ctx.row_selection.as_ref().map_or_else(
977 || {
978 reader_builder
979 .parquet_metadata()
980 .row_group(build_ctx.row_group_idx)
981 .num_rows() as usize
982 },
983 RowSelection::row_count,
984 )
985}
986
987fn refined_selection_from_mask(
988 mask: &BooleanBuffer,
989 original_selection: &Option<RowSelection>,
990) -> RowSelection {
991 if mask.is_empty() || mask.count_set_bits() == 0 {
992 return RowSelection::from(vec![]);
993 }
994
995 let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]);
996 match original_selection {
997 Some(original) => original.and_then(&prefilter_selection),
998 None => prefilter_selection,
999 }
1000}
1001
1002fn eval_entry_mask(
1003 batch: &RecordBatch,
1004 prefilter_ctx: &mut PrefilterContext,
1005 kind: PrefilterEntryKind,
1006 file_path: &str,
1007) -> Result<BooleanBuffer> {
1008 match kind {
1009 PrefilterEntryKind::Simple(idx) => {
1010 eval_simple_filter_mask(batch, &prefilter_ctx.filters[idx], file_path)
1011 }
1012 PrefilterEntryKind::Physical(idx) => {
1013 eval_physical_filter_mask(batch, &prefilter_ctx.physical_filters[idx], file_path)
1014 }
1015 PrefilterEntryKind::PkGroup => {
1016 let pk_filter = prefilter_ctx.pk_filter.as_mut().context(UnexpectedSnafu {
1017 reason: "Missing primary key filter for prefilter cache entry",
1018 })?;
1019 eval_pk_group_mask(batch, pk_filter.as_mut())
1020 }
1021 }
1022}
1023
1024fn eval_pk_group_mask(
1025 batch: &RecordBatch,
1026 pk_filter: &mut dyn PrimaryKeyFilter,
1027) -> Result<BooleanBuffer> {
1028 let (pk_column_index, _) = batch
1029 .schema()
1030 .column_with_name(PRIMARY_KEY_COLUMN_NAME)
1031 .context(UnexpectedSnafu {
1032 reason: "Primary key column not found in prefilter batch",
1033 })?;
1034 let matched_row_ranges = matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter)?;
1035 let mut builder = BooleanBufferBuilder::new(batch.num_rows());
1036 builder.append_n(batch.num_rows(), false);
1037 for range in matched_row_ranges {
1038 for row in range {
1039 builder.set_bit(row, true);
1040 }
1041 }
1042 Ok(builder.finish())
1043}
1044
1045fn eval_simple_filter_mask(
1046 batch: &RecordBatch,
1047 filter_ctx: &SimpleFilterContext,
1048 file_path: &str,
1049) -> Result<BooleanBuffer> {
1050 let filter = match filter_ctx.filter() {
1051 MaybeFilter::Filter(filter) => filter,
1052 MaybeFilter::Matched => return Ok(BooleanBuffer::new_set(batch.num_rows())),
1053 MaybeFilter::Pruned => return Ok(BooleanBuffer::new_unset(batch.num_rows())),
1054 };
1055
1056 let (idx, _) = batch
1057 .schema()
1058 .column_with_name(filter.column_name())
1059 .with_context(|| UnexpectedSnafu {
1060 reason: format!(
1061 "Prefilter column '{}' (id {}) not found in batch for file {}",
1062 filter.column_name(),
1063 filter_ctx.column_id(),
1064 file_path
1065 ),
1066 })?;
1067 let column = batch.column(idx).clone();
1068 filter.evaluate_array(&column).context(RecordBatchSnafu)
1069}
1070
1071fn eval_physical_filter_mask(
1072 batch: &RecordBatch,
1073 filter_ctx: &PhysicalFilterContext,
1074 file_path: &str,
1075) -> Result<BooleanBuffer> {
1076 let filter = filter_ctx.filter();
1077
1078 let (idx, _) = batch
1079 .schema()
1080 .column_with_name(filter_ctx.column_name())
1081 .with_context(|| UnexpectedSnafu {
1082 reason: format!(
1083 "Prefilter physical column '{}' (id {}) not found in batch for file {}",
1084 filter_ctx.column_name(),
1085 filter_ctx.column_id(),
1086 file_path
1087 ),
1088 })?;
1089 let column = batch.column(idx).clone();
1090
1091 let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
1092 .context(NewRecordBatchSnafu)?;
1093 let evaluated = filter
1094 .evaluate(&record_batch)
1095 .context(EvalPartitionFilterSnafu)?;
1096 let array = evaluated
1097 .into_array(record_batch.num_rows())
1098 .context(EvalPartitionFilterSnafu)?;
1099 let boolean_array = array
1100 .as_any()
1101 .downcast_ref::<BooleanArray>()
1102 .context(UnexpectedSnafu {
1103 reason: "Failed to downcast physical filter result to BooleanArray",
1104 })?;
1105 let mut result = boolean_array.values().clone();
1108 if let Some(nulls) = boolean_array.nulls() {
1109 result = result.bitand(nulls.inner());
1110 }
1111 Ok(result)
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116 use std::sync::Arc;
1117 use std::sync::atomic::{AtomicUsize, Ordering};
1118
1119 use common_recordbatch::filter::SimpleFilterEvaluator;
1120 use datafusion_expr::{col, lit};
1121 use datatypes::arrow::array::{
1122 ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
1123 };
1124 use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
1125 use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
1126 use store_api::codec::PrimaryKeyEncoding;
1127
1128 use super::*;
1129 use crate::read::read_columns::ReadColumns;
1130 use crate::sst::internal_fields;
1131 use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
1132 use crate::test_util::sst_util::{
1133 new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata,
1134 sst_region_metadata_with_encoding,
1135 };
1136
1137 struct CountingPrimaryKeyFilter {
1138 hits: Arc<AtomicUsize>,
1139 expected: Vec<u8>,
1140 }
1141
1142 impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
1143 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
1144 self.hits.fetch_add(1, Ordering::Relaxed);
1145 Ok(pk == self.expected.as_slice())
1146 }
1147 }
1148
1149 #[test]
1150 fn test_cached_primary_key_filter_reuses_previous_result() {
1151 let expected = new_primary_key(&["a", "x"]);
1152 let hits = Arc::new(AtomicUsize::new(0));
1153 let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
1154 hits: Arc::clone(&hits),
1155 expected: expected.clone(),
1156 }));
1157
1158 assert!(filter.matches(expected.as_slice()).unwrap());
1159 assert!(filter.matches(expected.as_slice()).unwrap());
1160 assert!(
1161 !filter
1162 .matches(new_primary_key(&["b", "x"]).as_slice())
1163 .unwrap()
1164 );
1165
1166 assert_eq!(hits.load(Ordering::Relaxed), 2);
1167 }
1168
1169 fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
1170 exprs
1171 .iter()
1172 .filter_map(SimpleFilterEvaluator::try_new)
1173 .collect()
1174 }
1175
1176 fn new_simple_filter_contexts(
1177 metadata: &RegionMetadataRef,
1178 exprs: &[datafusion_expr::Expr],
1179 ) -> Vec<SimpleFilterContext> {
1180 exprs
1181 .iter()
1182 .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
1183 .collect()
1184 }
1185
1186 fn new_physical_filter_contexts(
1187 metadata: &RegionMetadataRef,
1188 read_format: &FlatReadFormat,
1189 exprs: &[datafusion_expr::Expr],
1190 ) -> Vec<PhysicalFilterContext> {
1191 exprs
1192 .iter()
1193 .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr))
1194 .collect()
1195 }
1196
1197 fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1198 assert_eq!(primary_keys.len(), field_values.len());
1199
1200 let metadata = Arc::new(sst_region_metadata());
1201 let arrow_schema = metadata.schema.arrow_schema();
1202 let field_column = arrow_schema
1203 .field(arrow_schema.index_of("field_0").unwrap())
1204 .clone();
1205 let time_index_column = arrow_schema
1206 .field(arrow_schema.index_of("ts").unwrap())
1207 .clone();
1208 let mut fields = vec![field_column, time_index_column];
1209 fields.extend(
1210 internal_fields()
1211 .into_iter()
1212 .map(|field| field.as_ref().clone()),
1213 );
1214 let schema = Arc::new(Schema::new(fields));
1215
1216 let mut dict_values = Vec::new();
1217 let mut keys = Vec::with_capacity(primary_keys.len());
1218 for pk in primary_keys {
1219 let key = dict_values
1220 .iter()
1221 .position(|existing: &&[u8]| existing == pk)
1222 .unwrap_or_else(|| {
1223 dict_values.push(*pk);
1224 dict_values.len() - 1
1225 });
1226 keys.push(key as u32);
1227 }
1228 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1229 UInt32Array::from(keys),
1230 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1231 ));
1232
1233 RecordBatch::try_new(
1234 schema,
1235 vec![
1236 Arc::new(UInt64Array::from(field_values.to_vec())),
1237 Arc::new(TimestampMillisecondArray::from_iter_values(
1238 0..primary_keys.len() as i64,
1239 )),
1240 pk_array,
1241 Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
1242 Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
1243 ],
1244 )
1245 .unwrap()
1246 }
1247
1248 fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1249 assert_eq!(primary_keys.len(), field_values.len());
1250
1251 let metadata = Arc::new(sst_region_metadata());
1252 let arrow_schema = metadata.schema.arrow_schema();
1253 let field_column = arrow_schema
1254 .field(arrow_schema.index_of("field_0").unwrap())
1255 .clone();
1256 let time_index_column = arrow_schema
1257 .field(arrow_schema.index_of("ts").unwrap())
1258 .clone();
1259 let schema = Arc::new(Schema::new(vec![
1260 field_column,
1261 time_index_column,
1262 internal_fields()[0].as_ref().clone(),
1263 ]));
1264
1265 let mut dict_values = Vec::new();
1266 let mut keys = Vec::with_capacity(primary_keys.len());
1267 for pk in primary_keys {
1268 let key = dict_values
1269 .iter()
1270 .position(|existing: &&[u8]| existing == pk)
1271 .unwrap_or_else(|| {
1272 dict_values.push(*pk);
1273 dict_values.len() - 1
1274 });
1275 keys.push(key as u32);
1276 }
1277 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1278 UInt32Array::from(keys),
1279 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1280 ));
1281
1282 RecordBatch::try_new(
1283 schema,
1284 vec![
1285 Arc::new(UInt64Array::from(field_values.to_vec())),
1286 Arc::new(TimestampMillisecondArray::from_iter_values(
1287 0..primary_keys.len() as i64,
1288 )),
1289 pk_array,
1290 ],
1291 )
1292 .unwrap()
1293 }
1294
1295 fn new_prefilter_batch_binary_pk(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1296 assert_eq!(primary_keys.len(), field_values.len());
1297
1298 let metadata = Arc::new(sst_region_metadata());
1299 let arrow_schema = metadata.schema.arrow_schema();
1300 let field_column = arrow_schema
1301 .field(arrow_schema.index_of("field_0").unwrap())
1302 .clone();
1303 let time_index_column = arrow_schema
1304 .field(arrow_schema.index_of("ts").unwrap())
1305 .clone();
1306 let schema = Arc::new(Schema::new(vec![
1307 field_column,
1308 time_index_column,
1309 Field::new(PRIMARY_KEY_COLUMN_NAME, DataType::Binary, false),
1310 ]));
1311
1312 let pk_array: ArrayRef =
1313 Arc::new(BinaryArray::from_iter_values(primary_keys.iter().copied()));
1314
1315 RecordBatch::try_new(
1316 schema,
1317 vec![
1318 Arc::new(UInt64Array::from(field_values.to_vec())),
1319 Arc::new(TimestampMillisecondArray::from_iter_values(
1320 0..primary_keys.len() as i64,
1321 )),
1322 pk_array,
1323 ],
1324 )
1325 .unwrap()
1326 }
1327
1328 fn field_values(batch: &RecordBatch) -> Vec<u64> {
1329 batch
1330 .column(0)
1331 .as_any()
1332 .downcast_ref::<UInt64Array>()
1333 .unwrap()
1334 .values()
1335 .to_vec()
1336 }
1337
1338 fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> {
1339 filters
1340 .iter()
1341 .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name())
1342 .collect()
1343 }
1344
1345 #[test]
1346 fn test_prefilter_primary_key_drops_single_dictionary_batch() {
1347 let metadata = Arc::new(sst_region_metadata());
1348 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
1349 let mut primary_key_filter =
1350 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1351 let pk_a = new_primary_key(&["a", "x"]);
1352 let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
1353 let pk_col_idx = primary_key_column_index(batch.num_columns());
1354
1355 let filtered =
1356 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1357 .unwrap();
1358
1359 assert!(filtered.is_none());
1360 }
1361
1362 #[test]
1363 fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
1364 let metadata = Arc::new(sst_region_metadata());
1365 let filters = Arc::new(new_test_filters(&[col("tag_0")
1366 .eq(lit("a"))
1367 .or(col("tag_0").eq(lit("c")))]));
1368 let mut primary_key_filter =
1369 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1370 let pk_a = new_primary_key(&["a", "x"]);
1371 let pk_b = new_primary_key(&["b", "x"]);
1372 let pk_c = new_primary_key(&["c", "x"]);
1373 let pk_d = new_primary_key(&["d", "x"]);
1374 let batch = new_raw_batch(
1375 &[
1376 pk_a.as_slice(),
1377 pk_a.as_slice(),
1378 pk_b.as_slice(),
1379 pk_b.as_slice(),
1380 pk_c.as_slice(),
1381 pk_c.as_slice(),
1382 pk_d.as_slice(),
1383 pk_d.as_slice(),
1384 ],
1385 &[10, 11, 12, 13, 14, 15, 16, 17],
1386 );
1387 let pk_col_idx = primary_key_column_index(batch.num_columns());
1388
1389 let filtered =
1390 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1391 .unwrap()
1392 .unwrap();
1393
1394 assert_eq!(filtered.num_rows(), 4);
1395 assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
1396 }
1397
1398 #[test]
1399 fn test_prefilter_builder_returns_none_without_selected_filters() {
1400 let metadata: RegionMetadataRef =
1401 Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1402 let read_format = FlatReadFormat::new(
1403 metadata.clone(),
1404 ReadColumns::from_deduped_column_ids(
1405 metadata.column_metadatas.iter().map(|c| c.column_id),
1406 ),
1407 None,
1408 "test",
1409 false,
1410 )
1411 .unwrap();
1412 let codec = build_primary_key_codec(metadata.as_ref());
1413
1414 let builder = PrefilterContextBuilder::new(
1415 &read_format,
1416 &codec,
1417 None,
1418 None,
1419 Vec::new(),
1420 Vec::new(),
1421 metadata.schema_version,
1422 );
1423 assert!(builder.is_none());
1424 }
1425
1426 #[test]
1427 fn test_should_use_prefilter() {
1428 assert!(should_use_prefilter(1, 5, 6));
1429 assert!(!should_use_prefilter(1, 0, 1));
1430 assert!(!should_use_prefilter(1, 1, 2));
1431 assert!(!should_use_prefilter(4, 3, 7));
1432 assert!(should_use_prefilter(3, 3, 6));
1433 }
1434
1435 #[test]
1436 fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() {
1437 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1438 PrimaryKeyEncoding::Sparse,
1439 ));
1440 let legacy_read_format = FlatReadFormat::new(
1441 metadata.clone(),
1442 ReadColumns::from_deduped_column_ids(
1443 metadata.column_metadatas.iter().map(|c| c.column_id),
1444 ),
1445 None,
1446 "memtable",
1447 false,
1448 )
1449 .unwrap();
1450 assert!(!legacy_read_format.batch_has_raw_pk_columns());
1451
1452 let plan = build_bulk_filter_plan(
1453 &legacy_read_format,
1454 Some(&Predicate::new(vec![
1455 col("tag_0").eq(lit("a")),
1456 col("field_0").gt(lit(1_u64)),
1457 ])),
1458 );
1459 assert_eq!(
1460 plan.pk_filters.as_ref().map(|filters| filters.len()),
1461 Some(1)
1462 );
1463 assert_eq!(
1464 remaining_simple_filter_columns(&plan.remaining_simple_filters),
1465 vec!["field_0"]
1466 );
1467
1468 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1469 let raw_pk_read_format = FlatReadFormat::new(
1470 metadata.clone(),
1471 ReadColumns::from_deduped_column_ids(
1472 metadata.column_metadatas.iter().map(|c| c.column_id),
1473 ),
1474 None,
1475 "memtable",
1476 true,
1477 )
1478 .unwrap();
1479 assert!(raw_pk_read_format.batch_has_raw_pk_columns());
1480
1481 let tag_only_plan = build_bulk_filter_plan(
1482 &raw_pk_read_format,
1483 Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1484 );
1485 assert!(tag_only_plan.pk_filters.is_none());
1486 assert_eq!(
1487 remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters),
1488 vec!["tag_0"]
1489 );
1490
1491 let field_only_plan = build_bulk_filter_plan(
1492 &raw_pk_read_format,
1493 Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])),
1494 );
1495 assert!(field_only_plan.pk_filters.is_none());
1496 assert_eq!(
1497 remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters),
1498 vec!["field_0"]
1499 );
1500 }
1501
1502 #[test]
1503 fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() {
1504 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1505 let full_read_format = FlatReadFormat::new(
1506 metadata.clone(),
1507 ReadColumns::from_deduped_column_ids(
1508 metadata.column_metadatas.iter().map(|c| c.column_id),
1509 ),
1510 None,
1511 "test",
1512 true,
1513 )
1514 .unwrap();
1515 let codec = build_primary_key_codec(metadata.as_ref());
1516
1517 let skip_fields_plan = build_reader_filter_plan(
1518 Some(&Predicate::new(vec![
1519 col("tag_0").eq(lit("a")),
1520 col("field_0").gt(lit(1_u64)),
1521 ])),
1522 None,
1523 PreFilterMode::SkipFields,
1524 &full_read_format,
1525 &codec,
1526 );
1527 assert!(skip_fields_plan.prefilter_builder.is_some());
1528 assert_eq!(
1529 remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters),
1530 vec!["field_0"]
1531 );
1532
1533 let field_0 = metadata.column_by_name("field_0").unwrap().column_id;
1534 let ts = metadata.time_index_column().column_id;
1535 let projected_read_format = FlatReadFormat::new(
1536 metadata.clone(),
1537 ReadColumns::from_deduped_column_ids([field_0, ts]),
1538 None,
1539 "test",
1540 true,
1541 )
1542 .unwrap();
1543 let pk_prefilter_plan = build_reader_filter_plan(
1544 Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1545 None,
1546 PreFilterMode::All,
1547 &projected_read_format,
1548 &codec,
1549 );
1550 assert!(pk_prefilter_plan.prefilter_builder.is_some());
1551 assert!(pk_prefilter_plan.remaining_simple_filters.is_empty());
1552 }
1553
1554 #[test]
1555 fn test_pk_filter_expr_strings_are_stable_under_expr_order() {
1556 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1557 PrimaryKeyEncoding::Sparse,
1558 ));
1559 let read_format = FlatReadFormat::new(
1560 metadata.clone(),
1561 ReadColumns::from_deduped_column_ids(
1562 metadata.column_metadatas.iter().map(|c| c.column_id),
1563 ),
1564 None,
1565 "test",
1566 false,
1567 )
1568 .unwrap();
1569 let codec = build_primary_key_codec(metadata.as_ref());
1570
1571 let expr_a = col("tag_0").eq(lit("a"));
1572 let expr_b = col("tag_1").eq(lit("x"));
1573 let plan_ab = build_reader_filter_plan(
1574 Some(&Predicate::new(vec![expr_a.clone(), expr_b.clone()])),
1575 None,
1576 PreFilterMode::All,
1577 &read_format,
1578 &codec,
1579 );
1580 let plan_b_a = build_reader_filter_plan(
1581 Some(&Predicate::new(vec![expr_b, expr_a])),
1582 None,
1583 PreFilterMode::All,
1584 &read_format,
1585 &codec,
1586 );
1587
1588 let exprs_ab = plan_ab.prefilter_builder.unwrap().pk_filter_expr_strs;
1589 let exprs_b_a = plan_b_a.prefilter_builder.unwrap().pk_filter_expr_strs;
1590 assert!(exprs_ab.is_some());
1591 assert_eq!(exprs_ab, exprs_b_a);
1592 }
1593
1594 #[test]
1595 fn test_simple_and_physical_contexts_preserve_expr_strings() {
1596 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1597 let read_format = FlatReadFormat::new(
1598 metadata.clone(),
1599 ReadColumns::from_deduped_column_ids(
1600 metadata.column_metadatas.iter().map(|c| c.column_id),
1601 ),
1602 None,
1603 "test",
1604 true,
1605 )
1606 .unwrap();
1607
1608 let simple_expr = col("tag_0").eq(lit("a"));
1609 let simple = SimpleFilterContext::new_opt(&metadata, None, &simple_expr).unwrap();
1610 assert_eq!(simple.expr_str(), format!("{simple_expr:?}"));
1611
1612 let physical_expr = col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false);
1613 let physical =
1614 PhysicalFilterContext::new_opt(&metadata, None, &read_format, &physical_expr).unwrap();
1615 assert_eq!(physical.expr_str(), format!("{physical_expr:?}"));
1616 }
1617
1618 #[test]
1619 fn test_eval_simple_filter_mask_uses_flat_tag_columns_directly() {
1620 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1621 let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1622 let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
1623
1624 let mask = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap();
1625 assert_eq!(mask.count_set_bits(), 4);
1626 }
1627
1628 #[test]
1629 fn test_eval_simple_filter_mask_errors_on_missing_selected_column() {
1630 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1631 let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1632 let pk = new_primary_key(&["a", "x"]);
1633 let batch = new_raw_batch(&[pk.as_slice()], &[10]);
1634
1635 let err = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap_err();
1636 let err = err.to_string();
1637 assert!(err.contains("Prefilter column"));
1638 assert!(err.contains("tag_0"));
1639 }
1640
1641 #[test]
1642 fn test_eval_physical_filter_mask_evaluates_physical_filters() {
1643 let metadata: RegionMetadataRef =
1644 Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1645 let read_format = FlatReadFormat::new(
1646 metadata.clone(),
1647 ReadColumns::from_deduped_column_ids(
1648 metadata.column_metadatas.iter().map(|c| c.column_id),
1649 ),
1650 None,
1651 "test",
1652 false,
1653 )
1654 .unwrap();
1655 let expr = col("field_0").in_list(vec![lit(11_u64)], false);
1656 let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]);
1657 let pk = new_primary_key(&["a", "x"]);
1658 let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
1659
1660 let mask = eval_physical_filter_mask(&batch, &physical_filters[0], "test").unwrap();
1661 assert_eq!(mask.count_set_bits(), 1);
1662 }
1663
1664 #[test]
1665 fn test_eval_pk_group_mask_finds_pk_column_by_name() {
1666 let metadata = Arc::new(sst_region_metadata());
1667 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1668 let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1669 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters),
1670 )) as Box<dyn PrimaryKeyFilter>);
1671 let pk_a = new_primary_key(&["a", "x"]);
1672 let pk_b = new_primary_key(&["b", "x"]);
1673 let batch = new_prefilter_batch(
1674 &[
1675 pk_a.as_slice(),
1676 pk_a.as_slice(),
1677 pk_b.as_slice(),
1678 pk_b.as_slice(),
1679 ],
1680 &[10, 11, 12, 13],
1681 );
1682
1683 let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
1684
1685 assert_eq!(mask.count_set_bits(), 2);
1686 }
1687
1688 #[test]
1689 fn test_eval_pk_group_mask_handles_binary_pk_column() {
1690 let metadata = Arc::new(sst_region_metadata());
1691 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1692 let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1693 build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters),
1694 )) as Box<dyn PrimaryKeyFilter>);
1695 let pk_a = new_primary_key(&["a", "x"]);
1696 let pk_b = new_primary_key(&["b", "x"]);
1697 let batch = new_prefilter_batch_binary_pk(
1698 &[
1699 pk_a.as_slice(),
1700 pk_a.as_slice(),
1701 pk_b.as_slice(),
1702 pk_b.as_slice(),
1703 ],
1704 &[10, 11, 12, 13],
1705 );
1706
1707 let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
1708
1709 assert_eq!(mask.count_set_bits(), 2);
1710 }
1711}