1use std::collections::HashSet;
22use std::ops::{BitAnd, Range};
23use std::sync::Arc;
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::record_batch::RecordBatch;
30use futures::StreamExt;
31use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
32use parquet::arrow::ProjectionMask;
33use parquet::arrow::arrow_reader::RowSelection;
34use parquet::schema::types::SchemaDescriptor;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
38use table::predicate::Predicate;
39
40use crate::error::{
41 ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
42 ReadParquetSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
43};
44use crate::sst::parquet::file_range::PreFilterMode;
45use crate::sst::parquet::flat_format::FlatReadFormat;
46use crate::sst::parquet::format::PrimaryKeyArray;
47use crate::sst::parquet::reader::{
48 MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder,
49 SimpleFilterContext,
50};
51
52pub(crate) fn matching_row_ranges_by_primary_key(
53 input: &RecordBatch,
54 pk_column_index: usize,
55 pk_filter: &mut dyn PrimaryKeyFilter,
56) -> Result<Vec<Range<usize>>> {
57 let pk_dict_array = input
58 .column(pk_column_index)
59 .as_any()
60 .downcast_ref::<PrimaryKeyArray>()
61 .context(UnexpectedSnafu {
62 reason: "Primary key column is not a dictionary array",
63 })?;
64 let pk_values = pk_dict_array
65 .values()
66 .as_any()
67 .downcast_ref::<BinaryArray>()
68 .context(UnexpectedSnafu {
69 reason: "Primary key values are not binary array",
70 })?;
71 let keys = pk_dict_array.keys();
72 let key_values = keys.values();
73
74 if key_values.is_empty() {
75 return Ok(std::iter::once(0..input.num_rows()).collect());
76 }
77
78 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
79 let mut start = 0;
80 while start < key_values.len() {
81 let key = key_values[start];
82 let mut end = start + 1;
83 while end < key_values.len() && key_values[end] == key {
84 end += 1;
85 }
86
87 if pk_filter
88 .matches(pk_values.value(key as usize))
89 .context(DecodeSnafu)?
90 {
91 if let Some(last) = matched_row_ranges.last_mut()
92 && last.end == start
93 {
94 last.end = end;
95 } else {
96 matched_row_ranges.push(start..end);
97 }
98 }
99
100 start = end;
101 }
102
103 Ok(matched_row_ranges)
104}
105
106pub(crate) fn prefilter_flat_batch_by_primary_key(
109 input: RecordBatch,
110 pk_column_index: usize,
111 pk_filter: &mut dyn PrimaryKeyFilter,
112) -> Result<Option<RecordBatch>> {
113 if input.num_rows() == 0 {
114 return Ok(Some(input));
115 }
116
117 let matched_row_ranges =
118 matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
119 if matched_row_ranges.is_empty() {
120 return Ok(None);
121 }
122
123 if matched_row_ranges.len() == 1
124 && matched_row_ranges[0].start == 0
125 && matched_row_ranges[0].end == input.num_rows()
126 {
127 return Ok(Some(input));
128 }
129
130 if matched_row_ranges.len() == 1 {
131 let span = &matched_row_ranges[0];
132 return Ok(Some(input.slice(span.start, span.end - span.start)));
133 }
134
135 let mut builder = BooleanBufferBuilder::new(input.num_rows());
136 builder.append_n(input.num_rows(), false);
137 for span in matched_row_ranges {
138 for i in span {
139 builder.set_bit(i, true);
140 }
141 }
142
143 let filtered = datatypes::arrow::compute::filter_record_batch(
144 &input,
145 &BooleanArray::new(builder.finish(), None),
146 )
147 .context(ComputeArrowSnafu)?;
148 if filtered.num_rows() == 0 {
149 Ok(None)
150 } else {
151 Ok(Some(filtered))
152 }
153}
154
155pub(crate) struct CachedPrimaryKeyFilter {
156 inner: Box<dyn PrimaryKeyFilter>,
157 last_primary_key: Vec<u8>,
158 last_match: Option<bool>,
159}
160
161impl CachedPrimaryKeyFilter {
162 pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
163 Self {
164 inner,
165 last_primary_key: Vec::new(),
166 last_match: None,
167 }
168 }
169}
170
171impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
172 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
173 if let Some(last_match) = self.last_match
174 && self.last_primary_key == pk
175 {
176 return Ok(last_match);
177 }
178
179 let matched = self.inner.matches(pk)?;
180 self.last_primary_key.clear();
181 self.last_primary_key.extend_from_slice(pk);
182 self.last_match = Some(matched);
183 Ok(matched)
184 }
185}
186
187pub(crate) struct BulkFilterPlan {
193 pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
195 pub(crate) pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
198}
199
200pub(crate) struct ReaderFilterPlan {
208 pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
213 pub(crate) prefilter_builder: Option<PrefilterContextBuilder>,
217}
218
219pub(crate) fn build_bulk_filter_plan(
220 read_format: &FlatReadFormat,
221 predicate: Option<&Predicate>,
222) -> BulkFilterPlan {
223 let metadata = read_format.metadata();
224 let simple_filters: Vec<SimpleFilterContext> = predicate
227 .into_iter()
228 .flat_map(|predicate| {
229 predicate
230 .exprs()
231 .iter()
232 .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
233 })
234 .collect();
235
236 if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() {
240 return BulkFilterPlan {
241 remaining_simple_filters: simple_filters,
242 pk_filters: None,
243 };
244 }
245
246 let mut remaining_simple_filters = Vec::new();
247 let mut pk_filters = Vec::new();
248
249 for filter_ctx in simple_filters {
250 let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| {
253 (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone())
254 });
255
256 if let Some(pk_filter) = pk_filter {
257 pk_filters.push(pk_filter);
258 } else {
259 remaining_simple_filters.push(filter_ctx);
260 }
261 }
262
263 BulkFilterPlan {
264 remaining_simple_filters,
265 pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)),
266 }
267}
268
269pub(crate) fn build_reader_filter_plan(
284 predicate: Option<&Predicate>,
285 expected_metadata: Option<&RegionMetadata>,
286 pre_filter_mode: PreFilterMode,
287 read_format: &FlatReadFormat,
288 parquet_schema: &SchemaDescriptor,
289 codec: &Arc<dyn PrimaryKeyCodec>,
290) -> ReaderFilterPlan {
291 let Some(predicate) = predicate else {
292 return ReaderFilterPlan {
293 remaining_simple_filters: Vec::new(),
294 prefilter_builder: None,
295 };
296 };
297
298 let metadata = read_format.metadata();
299 let mut prefilter_simple_filters = Vec::new();
300 let mut remaining_simple_filters = Vec::new();
301 let mut prefilter_physical_filters = Vec::new();
302 let mut primary_key_filters = Vec::new();
303 let mut pk_filter_contexts = Vec::new();
304
305 let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All;
309 let need_pk_prefilter = !read_format.batch_has_raw_pk_columns();
313
314 let can_direct_prefilter = |semantic_type: SemanticType| -> bool {
317 match semantic_type {
318 SemanticType::Tag => !need_pk_prefilter,
319 SemanticType::Field => field_prefilter_enabled,
320 SemanticType::Timestamp => true,
321 }
322 };
323
324 for expr in predicate.exprs() {
325 if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) {
328 let Some(filter) = filter_ctx.filter().as_filter() else {
332 remaining_simple_filters.push(filter_ctx);
333 continue;
334 };
335
336 let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type());
339 if direct_prefilter {
340 assert!(
341 read_format
342 .arrow_schema()
343 .column_with_name(filter.column_name())
344 .is_some(),
345 "Column '{}' is not present in the arrow schema {:?}",
346 filter.column_name(),
347 read_format.arrow_schema(),
348 );
349 prefilter_simple_filters.push(filter_ctx);
350 continue;
351 }
352
353 if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag {
355 primary_key_filters.push(filter.clone());
356 pk_filter_contexts.push(filter_ctx);
357 } else {
358 remaining_simple_filters.push(filter_ctx);
359 }
360 continue;
361 }
362
363 if let Some(filter) =
368 PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr)
369 && can_direct_prefilter(filter.semantic_type())
370 {
371 prefilter_physical_filters.push(filter);
372 }
373 }
374
375 let pk_filter_exprs =
376 (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
377 let prefilter_builder = PrefilterContextBuilder::new(
378 read_format,
379 codec,
380 pk_filter_exprs,
381 prefilter_simple_filters.clone(),
382 prefilter_physical_filters,
383 parquet_schema,
384 );
385
386 if prefilter_builder.is_some() {
387 ReaderFilterPlan {
388 remaining_simple_filters,
389 prefilter_builder,
390 }
391 } else {
392 remaining_simple_filters.extend(prefilter_simple_filters);
395 remaining_simple_filters.extend(pk_filter_contexts);
396 ReaderFilterPlan {
397 remaining_simple_filters,
398 prefilter_builder: None,
399 }
400 }
401}
402
403pub(crate) struct PrefilterContext {
405 projection: ProjectionMask,
407 pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
409 filters: Vec<SimpleFilterContext>,
411 physical_filters: Vec<PhysicalFilterContext>,
414}
415
416pub(crate) struct PrefilterContextBuilder {
422 projection: ProjectionMask,
423 pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
424 filters: Vec<SimpleFilterContext>,
425 physical_filters: Vec<PhysicalFilterContext>,
426 codec: Arc<dyn PrimaryKeyCodec>,
427 metadata: RegionMetadataRef,
428}
429
430impl PrefilterContextBuilder {
431 pub(crate) fn new(
438 read_format: &FlatReadFormat,
439 codec: &Arc<dyn PrimaryKeyCodec>,
440 primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
441 filters: Vec<SimpleFilterContext>,
442 physical_filters: Vec<PhysicalFilterContext>,
443 parquet_schema: &SchemaDescriptor,
444 ) -> Option<Self> {
445 let metadata = read_format.metadata();
446 let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
447 let pk_filters = (!use_raw_tag_columns)
448 .then_some(primary_key_filters)
449 .flatten()
450 .filter(|filters| !filters.is_empty());
451
452 let mut prefilter_column_names = HashSet::new();
453 for filter_ctx in &filters {
454 if let MaybeFilter::Filter(filter) = filter_ctx.filter() {
455 prefilter_column_names.insert(filter.column_name().to_string());
456 }
457 }
458
459 if pk_filters.is_some() {
460 prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
461 }
462
463 for filter_ctx in &physical_filters {
464 prefilter_column_names.insert(filter_ctx.column_name().to_string());
465 }
466
467 let (projection, prefilter_count) = compute_projection_mask(
468 &prefilter_column_names,
469 read_format.arrow_schema(),
470 parquet_schema,
471 );
472
473 if prefilter_count == 0 {
474 return None;
475 }
476
477 let total_count = read_format.parquet_read_columns().root_indices().len();
478 let remaining_count = total_count.saturating_sub(prefilter_count);
479 if pk_filters.is_none() && prefilter_count >= total_count {
480 return None;
481 }
482
483 if pk_filters.is_none()
484 && !should_use_prefilter(prefilter_count, remaining_count, total_count)
485 {
486 return None;
487 }
488
489 Some(Self {
490 projection,
491 pk_filters,
492 filters,
493 physical_filters,
494 codec: Arc::clone(codec),
495 metadata: metadata.clone(),
496 })
497 }
498
499 pub(crate) fn build(&self) -> PrefilterContext {
501 let pk_filter = self.pk_filters.as_ref().map(|pk_filters| {
502 let pk_filter =
503 self.codec
504 .primary_key_filter(&self.metadata, Arc::clone(pk_filters), false);
505 Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
506 });
507 PrefilterContext {
508 projection: self.projection.clone(),
509 pk_filter,
510 filters: self.filters.clone(),
511 physical_filters: self.physical_filters.clone(),
512 }
513 }
514}
515
516const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5;
517const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2;
518
519pub(crate) struct PrefilterResult {
521 pub(crate) refined_selection: RowSelection,
523 pub(crate) filtered_rows: usize,
525}
526
527fn compute_projection_mask(
532 column_names: &HashSet<String>,
533 arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
534 parquet_schema: &SchemaDescriptor,
535) -> (ProjectionMask, usize) {
536 let mut projection_indices: Vec<usize> = column_names
537 .iter()
538 .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
539 .collect();
540 projection_indices.sort_unstable();
541 projection_indices.dedup();
542 let count = projection_indices.len();
543 (
544 ProjectionMask::roots(parquet_schema, projection_indices.iter().copied()),
545 count,
546 )
547}
548
549fn should_use_prefilter(
550 prefilter_count: usize,
551 remaining_count: usize,
552 total_count: usize,
553) -> bool {
554 if remaining_count == 0 {
555 return false;
556 }
557
558 if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS {
559 return false;
560 }
561
562 let ratio = prefilter_count as f64 / total_count as f64;
563 ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD
564}
565
566pub(crate) async fn execute_prefilter(
567 prefilter_ctx: &mut PrefilterContext,
568 reader_builder: &RowGroupReaderBuilder,
569 build_ctx: &RowGroupBuildContext<'_>,
570) -> Result<PrefilterResult> {
571 let mut stream = reader_builder
572 .build_with_projection(
573 build_ctx.row_group_idx,
574 build_ctx.row_selection.clone(),
575 prefilter_ctx.projection.clone(),
576 build_ctx.fetch_metrics,
577 )
578 .await?;
579
580 let mut filter_arrays = Vec::new();
581 let mut rows_before_filter = 0usize;
582 let mut rows_selected = 0usize;
583
584 while let Some(batch_result) = stream.next().await {
585 let batch = batch_result.context(ReadParquetSnafu {
586 path: reader_builder.file_path(),
587 })?;
588 let num_rows = batch.num_rows();
589 if num_rows == 0 {
590 continue;
591 }
592 rows_before_filter += num_rows;
593
594 let batch_mask = match apply_filters_to_batch(
595 &batch,
596 &mut prefilter_ctx.pk_filter,
597 &prefilter_ctx.filters,
598 &prefilter_ctx.physical_filters,
599 reader_builder.file_path(),
600 )? {
601 Some(mask) => mask,
602 None => BooleanBuffer::new_unset(num_rows),
603 };
604 rows_selected += batch_mask.count_set_bits();
605 filter_arrays.push(BooleanArray::from(batch_mask));
606 }
607
608 let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
609 let refined_selection = if filter_arrays.is_empty() || rows_selected == 0 {
610 RowSelection::from(vec![])
611 } else {
612 let prefilter_selection = RowSelection::from_filters(&filter_arrays);
613 match &build_ctx.row_selection {
614 Some(original) => original.and_then(&prefilter_selection),
615 None => prefilter_selection,
616 }
617 };
618
619 Ok(PrefilterResult {
620 refined_selection,
621 filtered_rows,
622 })
623}
624
625fn apply_filters_to_batch(
626 batch: &RecordBatch,
627 pk_filter: &mut Option<Box<dyn PrimaryKeyFilter>>,
628 filters: &[SimpleFilterContext],
629 physical_filters: &[PhysicalFilterContext],
630 file_path: &str,
631) -> Result<Option<BooleanBuffer>> {
632 let mut mask = BooleanBuffer::new_set(batch.num_rows());
633
634 if let Some(pk_filter) = pk_filter.as_mut() {
635 let pk_column_index = batch.num_columns() - 1;
639 let matched_row_ranges =
640 matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter.as_mut())?;
641 let mut builder = BooleanBufferBuilder::new(batch.num_rows());
642 builder.append_n(batch.num_rows(), false);
643 for range in matched_row_ranges {
644 for row in range {
645 builder.set_bit(row, true);
646 }
647 }
648 mask = mask.bitand(&builder.finish());
649 }
650
651 for filter_ctx in filters {
652 let filter = match filter_ctx.filter() {
653 MaybeFilter::Filter(filter) => filter,
654 MaybeFilter::Matched => continue,
655 MaybeFilter::Pruned => return Ok(None),
656 };
657
658 let (idx, _) = batch
659 .schema()
660 .column_with_name(filter.column_name())
661 .with_context(|| UnexpectedSnafu {
662 reason: format!(
663 "Prefilter column '{}' (id {}) not found in batch for file {}",
664 filter.column_name(),
665 filter_ctx.column_id(),
666 file_path
667 ),
668 })?;
669 let column = batch.column(idx).clone();
670 let result = filter.evaluate_array(&column).context(RecordBatchSnafu)?;
671 mask = mask.bitand(&result);
672 }
673
674 for filter_ctx in physical_filters {
675 let filter = filter_ctx.filter();
676
677 let (idx, _) = batch
678 .schema()
679 .column_with_name(filter_ctx.column_name())
680 .with_context(|| UnexpectedSnafu {
681 reason: format!(
682 "Prefilter physical column '{}' (id {}) not found in batch for file {}",
683 filter_ctx.column_name(),
684 filter_ctx.column_id(),
685 file_path
686 ),
687 })?;
688 let column = batch.column(idx).clone();
689
690 let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
691 .context(NewRecordBatchSnafu)?;
692 let evaluated = filter
693 .evaluate(&record_batch)
694 .context(EvalPartitionFilterSnafu)?;
695 let array = evaluated
696 .into_array(record_batch.num_rows())
697 .context(EvalPartitionFilterSnafu)?;
698 let boolean_array =
699 array
700 .as_any()
701 .downcast_ref::<BooleanArray>()
702 .context(UnexpectedSnafu {
703 reason: "Failed to downcast physical filter result to BooleanArray",
704 })?;
705 let mut result = boolean_array.values().clone();
708 if let Some(nulls) = boolean_array.nulls() {
709 result = result.bitand(nulls.inner());
710 }
711 mask = mask.bitand(&result);
712 }
713
714 if mask.count_set_bits() == 0 {
715 Ok(None)
716 } else {
717 Ok(Some(mask))
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use std::sync::Arc;
724 use std::sync::atomic::{AtomicUsize, Ordering};
725
726 use common_recordbatch::filter::SimpleFilterEvaluator;
727 use datafusion_expr::{col, lit};
728 use datatypes::arrow::array::{
729 ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
730 };
731 use datatypes::arrow::datatypes::{Schema, UInt32Type};
732 use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
733 use parquet::arrow::ArrowSchemaConverter;
734 use store_api::codec::PrimaryKeyEncoding;
735
736 use super::*;
737 use crate::read::read_columns::ReadColumns;
738 use crate::sst::internal_fields;
739 use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
740 use crate::test_util::sst_util::{
741 new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata,
742 sst_region_metadata_with_encoding,
743 };
744
745 struct CountingPrimaryKeyFilter {
746 hits: Arc<AtomicUsize>,
747 expected: Vec<u8>,
748 }
749
750 impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
751 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
752 self.hits.fetch_add(1, Ordering::Relaxed);
753 Ok(pk == self.expected.as_slice())
754 }
755 }
756
757 #[test]
758 fn test_cached_primary_key_filter_reuses_previous_result() {
759 let expected = new_primary_key(&["a", "x"]);
760 let hits = Arc::new(AtomicUsize::new(0));
761 let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
762 hits: Arc::clone(&hits),
763 expected: expected.clone(),
764 }));
765
766 assert!(filter.matches(expected.as_slice()).unwrap());
767 assert!(filter.matches(expected.as_slice()).unwrap());
768 assert!(
769 !filter
770 .matches(new_primary_key(&["b", "x"]).as_slice())
771 .unwrap()
772 );
773
774 assert_eq!(hits.load(Ordering::Relaxed), 2);
775 }
776
777 fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
778 exprs
779 .iter()
780 .filter_map(SimpleFilterEvaluator::try_new)
781 .collect()
782 }
783
784 fn new_simple_filter_contexts(
785 metadata: &RegionMetadataRef,
786 exprs: &[datafusion_expr::Expr],
787 ) -> Vec<SimpleFilterContext> {
788 exprs
789 .iter()
790 .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
791 .collect()
792 }
793
794 fn new_physical_filter_contexts(
795 metadata: &RegionMetadataRef,
796 read_format: &FlatReadFormat,
797 exprs: &[datafusion_expr::Expr],
798 ) -> Vec<PhysicalFilterContext> {
799 exprs
800 .iter()
801 .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr))
802 .collect()
803 }
804
805 fn parquet_schema(read_format: &FlatReadFormat) -> SchemaDescriptor {
806 ArrowSchemaConverter::new()
807 .convert(read_format.arrow_schema())
808 .unwrap()
809 }
810
811 fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
812 assert_eq!(primary_keys.len(), field_values.len());
813
814 let metadata = Arc::new(sst_region_metadata());
815 let arrow_schema = metadata.schema.arrow_schema();
816 let field_column = arrow_schema
817 .field(arrow_schema.index_of("field_0").unwrap())
818 .clone();
819 let time_index_column = arrow_schema
820 .field(arrow_schema.index_of("ts").unwrap())
821 .clone();
822 let mut fields = vec![field_column, time_index_column];
823 fields.extend(
824 internal_fields()
825 .into_iter()
826 .map(|field| field.as_ref().clone()),
827 );
828 let schema = Arc::new(Schema::new(fields));
829
830 let mut dict_values = Vec::new();
831 let mut keys = Vec::with_capacity(primary_keys.len());
832 for pk in primary_keys {
833 let key = dict_values
834 .iter()
835 .position(|existing: &&[u8]| existing == pk)
836 .unwrap_or_else(|| {
837 dict_values.push(*pk);
838 dict_values.len() - 1
839 });
840 keys.push(key as u32);
841 }
842 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
843 UInt32Array::from(keys),
844 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
845 ));
846
847 RecordBatch::try_new(
848 schema,
849 vec![
850 Arc::new(UInt64Array::from(field_values.to_vec())),
851 Arc::new(TimestampMillisecondArray::from_iter_values(
852 0..primary_keys.len() as i64,
853 )),
854 pk_array,
855 Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
856 Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
857 ],
858 )
859 .unwrap()
860 }
861
862 fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
863 assert_eq!(primary_keys.len(), field_values.len());
864
865 let metadata = Arc::new(sst_region_metadata());
866 let arrow_schema = metadata.schema.arrow_schema();
867 let field_column = arrow_schema
868 .field(arrow_schema.index_of("field_0").unwrap())
869 .clone();
870 let time_index_column = arrow_schema
871 .field(arrow_schema.index_of("ts").unwrap())
872 .clone();
873 let schema = Arc::new(Schema::new(vec![
874 field_column,
875 time_index_column,
876 internal_fields()[0].as_ref().clone(),
877 ]));
878
879 let mut dict_values = Vec::new();
880 let mut keys = Vec::with_capacity(primary_keys.len());
881 for pk in primary_keys {
882 let key = dict_values
883 .iter()
884 .position(|existing: &&[u8]| existing == pk)
885 .unwrap_or_else(|| {
886 dict_values.push(*pk);
887 dict_values.len() - 1
888 });
889 keys.push(key as u32);
890 }
891 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
892 UInt32Array::from(keys),
893 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
894 ));
895
896 RecordBatch::try_new(
897 schema,
898 vec![
899 Arc::new(UInt64Array::from(field_values.to_vec())),
900 Arc::new(TimestampMillisecondArray::from_iter_values(
901 0..primary_keys.len() as i64,
902 )),
903 pk_array,
904 ],
905 )
906 .unwrap()
907 }
908
909 fn field_values(batch: &RecordBatch) -> Vec<u64> {
910 batch
911 .column(0)
912 .as_any()
913 .downcast_ref::<UInt64Array>()
914 .unwrap()
915 .values()
916 .to_vec()
917 }
918
919 fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> {
920 filters
921 .iter()
922 .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name())
923 .collect()
924 }
925
926 #[test]
927 fn test_prefilter_primary_key_drops_single_dictionary_batch() {
928 let metadata = Arc::new(sst_region_metadata());
929 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
930 let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
931 .primary_key_filter(&metadata, filters, false);
932 let pk_a = new_primary_key(&["a", "x"]);
933 let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
934 let pk_col_idx = primary_key_column_index(batch.num_columns());
935
936 let filtered =
937 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
938 .unwrap();
939
940 assert!(filtered.is_none());
941 }
942
943 #[test]
944 fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
945 let metadata = Arc::new(sst_region_metadata());
946 let filters = Arc::new(new_test_filters(&[col("tag_0")
947 .eq(lit("a"))
948 .or(col("tag_0").eq(lit("c")))]));
949 let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
950 .primary_key_filter(&metadata, filters, false);
951 let pk_a = new_primary_key(&["a", "x"]);
952 let pk_b = new_primary_key(&["b", "x"]);
953 let pk_c = new_primary_key(&["c", "x"]);
954 let pk_d = new_primary_key(&["d", "x"]);
955 let batch = new_raw_batch(
956 &[
957 pk_a.as_slice(),
958 pk_a.as_slice(),
959 pk_b.as_slice(),
960 pk_b.as_slice(),
961 pk_c.as_slice(),
962 pk_c.as_slice(),
963 pk_d.as_slice(),
964 pk_d.as_slice(),
965 ],
966 &[10, 11, 12, 13, 14, 15, 16, 17],
967 );
968 let pk_col_idx = primary_key_column_index(batch.num_columns());
969
970 let filtered =
971 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
972 .unwrap()
973 .unwrap();
974
975 assert_eq!(filtered.num_rows(), 4);
976 assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
977 }
978
979 #[test]
980 fn test_prefilter_builder_returns_none_without_selected_filters() {
981 let metadata: RegionMetadataRef =
982 Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
983 let read_format = FlatReadFormat::new(
984 metadata.clone(),
985 ReadColumns::from_deduped_column_ids(
986 metadata.column_metadatas.iter().map(|c| c.column_id),
987 ),
988 None,
989 "test",
990 false,
991 )
992 .unwrap();
993 let codec = build_primary_key_codec(metadata.as_ref());
994 let parquet_schema = parquet_schema(&read_format);
995
996 let builder = PrefilterContextBuilder::new(
997 &read_format,
998 &codec,
999 None,
1000 Vec::new(),
1001 Vec::new(),
1002 &parquet_schema,
1003 );
1004 assert!(builder.is_none());
1005 }
1006
1007 #[test]
1008 fn test_should_use_prefilter() {
1009 assert!(should_use_prefilter(1, 5, 6));
1010 assert!(!should_use_prefilter(1, 0, 1));
1011 assert!(!should_use_prefilter(1, 1, 2));
1012 assert!(!should_use_prefilter(4, 3, 7));
1013 assert!(should_use_prefilter(3, 3, 6));
1014 }
1015
1016 #[test]
1017 fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() {
1018 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1019 PrimaryKeyEncoding::Sparse,
1020 ));
1021 let legacy_read_format = FlatReadFormat::new(
1022 metadata.clone(),
1023 ReadColumns::from_deduped_column_ids(
1024 metadata.column_metadatas.iter().map(|c| c.column_id),
1025 ),
1026 None,
1027 "memtable",
1028 false,
1029 )
1030 .unwrap();
1031 assert!(!legacy_read_format.batch_has_raw_pk_columns());
1032
1033 let plan = build_bulk_filter_plan(
1034 &legacy_read_format,
1035 Some(&Predicate::new(vec![
1036 col("tag_0").eq(lit("a")),
1037 col("field_0").gt(lit(1_u64)),
1038 ])),
1039 );
1040 assert_eq!(
1041 plan.pk_filters.as_ref().map(|filters| filters.len()),
1042 Some(1)
1043 );
1044 assert_eq!(
1045 remaining_simple_filter_columns(&plan.remaining_simple_filters),
1046 vec!["field_0"]
1047 );
1048
1049 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1050 let raw_pk_read_format = FlatReadFormat::new(
1051 metadata.clone(),
1052 ReadColumns::from_deduped_column_ids(
1053 metadata.column_metadatas.iter().map(|c| c.column_id),
1054 ),
1055 None,
1056 "memtable",
1057 true,
1058 )
1059 .unwrap();
1060 assert!(raw_pk_read_format.batch_has_raw_pk_columns());
1061
1062 let tag_only_plan = build_bulk_filter_plan(
1063 &raw_pk_read_format,
1064 Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1065 );
1066 assert!(tag_only_plan.pk_filters.is_none());
1067 assert_eq!(
1068 remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters),
1069 vec!["tag_0"]
1070 );
1071
1072 let field_only_plan = build_bulk_filter_plan(
1073 &raw_pk_read_format,
1074 Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])),
1075 );
1076 assert!(field_only_plan.pk_filters.is_none());
1077 assert_eq!(
1078 remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters),
1079 vec!["field_0"]
1080 );
1081 }
1082
1083 #[test]
1084 fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() {
1085 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1086 let full_read_format = FlatReadFormat::new(
1087 metadata.clone(),
1088 ReadColumns::from_deduped_column_ids(
1089 metadata.column_metadatas.iter().map(|c| c.column_id),
1090 ),
1091 None,
1092 "test",
1093 true,
1094 )
1095 .unwrap();
1096 let full_parquet_schema = parquet_schema(&full_read_format);
1097 let codec = build_primary_key_codec(metadata.as_ref());
1098
1099 let skip_fields_plan = build_reader_filter_plan(
1100 Some(&Predicate::new(vec![
1101 col("tag_0").eq(lit("a")),
1102 col("field_0").gt(lit(1_u64)),
1103 ])),
1104 None,
1105 PreFilterMode::SkipFields,
1106 &full_read_format,
1107 &full_parquet_schema,
1108 &codec,
1109 );
1110 assert!(skip_fields_plan.prefilter_builder.is_some());
1111 assert_eq!(
1112 remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters),
1113 vec!["field_0"]
1114 );
1115
1116 let field_0 = metadata.column_by_name("field_0").unwrap().column_id;
1117 let ts = metadata.time_index_column().column_id;
1118 let projected_read_format = FlatReadFormat::new(
1119 metadata.clone(),
1120 ReadColumns::from_deduped_column_ids([field_0, ts]),
1121 None,
1122 "test",
1123 true,
1124 )
1125 .unwrap();
1126 let projected_parquet_schema = parquet_schema(&projected_read_format);
1127 let pk_prefilter_plan = build_reader_filter_plan(
1128 Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1129 None,
1130 PreFilterMode::All,
1131 &projected_read_format,
1132 &projected_parquet_schema,
1133 &codec,
1134 );
1135 assert!(pk_prefilter_plan.prefilter_builder.is_some());
1136 assert!(pk_prefilter_plan.remaining_simple_filters.is_empty());
1137 }
1138
1139 #[test]
1140 fn test_apply_filters_to_batch_uses_flat_tag_columns_directly() {
1141 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1142 let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1143 let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
1144
1145 let mut no_pk_filter = None;
1146 let mask = apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test")
1147 .unwrap()
1148 .unwrap();
1149 assert_eq!(mask.count_set_bits(), 4);
1150 }
1151
1152 #[test]
1153 fn test_apply_filters_to_batch_errors_on_missing_selected_column() {
1154 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1155 let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1156 let pk = new_primary_key(&["a", "x"]);
1157 let batch = new_raw_batch(&[pk.as_slice()], &[10]);
1158
1159 let mut no_pk_filter = None;
1160 let err =
1161 apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test").unwrap_err();
1162 let err = err.to_string();
1163 assert!(err.contains("Prefilter column"));
1164 assert!(err.contains("tag_0"));
1165 }
1166
1167 #[test]
1168 fn test_apply_filters_to_batch_evaluates_physical_filters() {
1169 let metadata: RegionMetadataRef =
1170 Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1171 let read_format = FlatReadFormat::new(
1172 metadata.clone(),
1173 ReadColumns::from_deduped_column_ids(
1174 metadata.column_metadatas.iter().map(|c| c.column_id),
1175 ),
1176 None,
1177 "test",
1178 false,
1179 )
1180 .unwrap();
1181 let expr = col("field_0").in_list(vec![lit(11_u64)], false);
1182 let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]);
1183 let pk = new_primary_key(&["a", "x"]);
1184 let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
1185
1186 let mut no_pk_filter = None;
1187 let mask =
1188 apply_filters_to_batch(&batch, &mut no_pk_filter, &[], &physical_filters, "test")
1189 .unwrap()
1190 .unwrap();
1191 assert_eq!(mask.count_set_bits(), 1);
1192 }
1193
1194 #[test]
1195 fn test_apply_filters_to_batch_uses_last_projected_column_for_pk_prefilter() {
1196 let metadata = Arc::new(sst_region_metadata());
1197 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1198 let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1199 build_primary_key_codec(metadata.as_ref())
1200 .primary_key_filter(&metadata, filters, false),
1201 )) as Box<dyn PrimaryKeyFilter>);
1202 let pk_a = new_primary_key(&["a", "x"]);
1203 let pk_b = new_primary_key(&["b", "x"]);
1204 let batch = new_prefilter_batch(
1205 &[
1206 pk_a.as_slice(),
1207 pk_a.as_slice(),
1208 pk_b.as_slice(),
1209 pk_b.as_slice(),
1210 ],
1211 &[10, 11, 12, 13],
1212 );
1213
1214 let mask = apply_filters_to_batch(&batch, &mut pk_filter, &[], &[], "test")
1215 .unwrap()
1216 .unwrap();
1217
1218 assert_eq!(mask.count_set_bits(), 2);
1219 }
1220}