Skip to main content

mito2/sst/parquet/
prefilter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prefilter framework for parquet reader.
16//!
17//! Prefilter optimization reduces I/O by reading only a subset of columns first
18//! (the prefilter phase), applying filters to compute a refined row selection,
19//! then reading the remaining columns with the refined selection.
20
21use std::collections::HashSet;
22use std::ops::{BitAnd, Range};
23use std::sync::Arc;
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::datatypes::SchemaRef;
30use datatypes::arrow::record_batch::RecordBatch;
31use futures::StreamExt;
32use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
33use parquet::arrow::ProjectionMask;
34use parquet::arrow::arrow_reader::RowSelection;
35use parquet::schema::types::SchemaDescriptor;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
40use table::predicate::Predicate;
41
42use crate::cache::PrefilterKey;
43use crate::error::{
44    ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
45    RecordBatchSnafu, Result, UnexpectedSnafu,
46};
47use crate::sst::parquet::file_range::PreFilterMode;
48use crate::sst::parquet::flat_format::FlatReadFormat;
49use crate::sst::parquet::format::PrimaryKeyArray;
50use crate::sst::parquet::reader::{
51    MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder,
52    SimpleFilterContext,
53};
54
55pub(crate) fn matching_row_ranges_by_primary_key(
56    input: &RecordBatch,
57    pk_column_index: usize,
58    pk_filter: &mut dyn PrimaryKeyFilter,
59) -> Result<Vec<Range<usize>>> {
60    let pk_dict_array = input
61        .column(pk_column_index)
62        .as_any()
63        .downcast_ref::<PrimaryKeyArray>()
64        .context(UnexpectedSnafu {
65            reason: "Primary key column is not a dictionary array",
66        })?;
67    let pk_values = pk_dict_array
68        .values()
69        .as_any()
70        .downcast_ref::<BinaryArray>()
71        .context(UnexpectedSnafu {
72            reason: "Primary key values are not binary array",
73        })?;
74    let keys = pk_dict_array.keys();
75    let key_values = keys.values();
76
77    if key_values.is_empty() {
78        return Ok(std::iter::once(0..input.num_rows()).collect());
79    }
80
81    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
82    let mut start = 0;
83    while start < key_values.len() {
84        let key = key_values[start];
85        let mut end = start + 1;
86        while end < key_values.len() && key_values[end] == key {
87            end += 1;
88        }
89
90        if pk_filter
91            .matches(pk_values.value(key as usize))
92            .context(DecodeSnafu)?
93        {
94            if let Some(last) = matched_row_ranges.last_mut()
95                && last.end == start
96            {
97                last.end = end;
98            } else {
99                matched_row_ranges.push(start..end);
100            }
101        }
102
103        start = end;
104    }
105
106    Ok(matched_row_ranges)
107}
108
109/// Filters a flat-format record batch by primary key, returning only rows whose
110/// primary key matches the filter. Returns `None` if all rows are filtered out.
111pub(crate) fn prefilter_flat_batch_by_primary_key(
112    input: RecordBatch,
113    pk_column_index: usize,
114    pk_filter: &mut dyn PrimaryKeyFilter,
115) -> Result<Option<RecordBatch>> {
116    if input.num_rows() == 0 {
117        return Ok(Some(input));
118    }
119
120    let matched_row_ranges =
121        matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
122    if matched_row_ranges.is_empty() {
123        return Ok(None);
124    }
125
126    if matched_row_ranges.len() == 1
127        && matched_row_ranges[0].start == 0
128        && matched_row_ranges[0].end == input.num_rows()
129    {
130        return Ok(Some(input));
131    }
132
133    if matched_row_ranges.len() == 1 {
134        let span = &matched_row_ranges[0];
135        return Ok(Some(input.slice(span.start, span.end - span.start)));
136    }
137
138    let mut builder = BooleanBufferBuilder::new(input.num_rows());
139    builder.append_n(input.num_rows(), false);
140    for span in matched_row_ranges {
141        for i in span {
142            builder.set_bit(i, true);
143        }
144    }
145
146    let filtered = datatypes::arrow::compute::filter_record_batch(
147        &input,
148        &BooleanArray::new(builder.finish(), None),
149    )
150    .context(ComputeArrowSnafu)?;
151    if filtered.num_rows() == 0 {
152        Ok(None)
153    } else {
154        Ok(Some(filtered))
155    }
156}
157
158pub(crate) struct CachedPrimaryKeyFilter {
159    inner: Box<dyn PrimaryKeyFilter>,
160    last_primary_key: Vec<u8>,
161    last_match: Option<bool>,
162}
163
164impl CachedPrimaryKeyFilter {
165    pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
166        Self {
167            inner,
168            last_primary_key: Vec::new(),
169            last_match: None,
170        }
171    }
172}
173
174impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
175    fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
176        if let Some(last_match) = self.last_match
177            && self.last_primary_key == pk
178        {
179            return Ok(last_match);
180        }
181
182        let matched = self.inner.matches(pk)?;
183        self.last_primary_key.clear();
184        self.last_primary_key.extend_from_slice(pk);
185        self.last_match = Some(matched);
186        Ok(matched)
187    }
188}
189
190/// How the bulk-memtable read should apply each predicate.
191///
192/// Unlike the parquet reader, the bulk path has no prefilter pass; predicates
193/// either run row-wise inside the iterator or are pushed down to encoded-PK
194/// matching when the batch still carries the primary-key column.
195pub(crate) struct BulkFilterPlan {
196    /// Simple filters the iterator still has to evaluate row-wise on each batch.
197    pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
198    /// Tag predicates lowered to encoded-PK filters. `None` when the batch
199    /// already exposes raw tag columns or there are no tag predicates.
200    pub(crate) pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
201}
202
203/// How the parquet reader should apply each predicate.
204///
205/// The reader runs in two phases. Predicates routed into `prefilter_builder`
206/// execute on a reduced column set first to compute a refined row selection;
207/// `remaining_simple_filters` execute alongside the full projection on the
208/// normal read path. The contract for what is precise vs best-effort is
209/// documented on [`build_reader_filter_plan`].
210pub(crate) struct ReaderFilterPlan {
211    /// Simple filters that must run on the normal read path: predicates with
212    /// `Matched` / `Pruned` outcomes (which carry expected-metadata
213    /// compatibility decisions later phases rely on), and predicates whose
214    /// column cannot be read directly during the prefilter pass.
215    pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
216    /// Pre-built state for the prefilter pass, or `None` when prefiltering is
217    /// not worthwhile (no prefilter columns selected, or the prefilter
218    /// projection would cover nearly the full read).
219    pub(crate) prefilter_builder: Option<PrefilterContextBuilder>,
220}
221
222pub(crate) fn build_bulk_filter_plan(
223    read_format: &FlatReadFormat,
224    predicate: Option<&Predicate>,
225) -> BulkFilterPlan {
226    let metadata = read_format.metadata();
227    // Bulk memtable only needs simple binary filters here. Any filter that
228    // cannot be reduced to a SimpleFilterContext stays out of this fast path.
229    let simple_filters: Vec<SimpleFilterContext> = predicate
230        .into_iter()
231        .flat_map(|predicate| {
232            predicate
233                .exprs()
234                .iter()
235                .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
236        })
237        .collect();
238
239    // PK prefilter only works when flat batches still carry the encoded PK
240    // column. If tags have already been expanded to raw columns, the iterator
241    // can apply those filters directly and there is nothing to extract here.
242    if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() {
243        return BulkFilterPlan {
244            remaining_simple_filters: simple_filters,
245            pk_filters: None,
246        };
247    }
248
249    let mut remaining_simple_filters = Vec::new();
250    let mut pk_filters = Vec::new();
251
252    for filter_ctx in simple_filters {
253        // Split tag predicates that can be evaluated against the encoded PK
254        // from filters that still need normal row-wise evaluation later.
255        let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| {
256            (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone())
257        });
258
259        if let Some(pk_filter) = pk_filter {
260            pk_filters.push(pk_filter);
261        } else {
262            remaining_simple_filters.push(filter_ctx);
263        }
264    }
265
266    BulkFilterPlan {
267        remaining_simple_filters,
268        pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)),
269    }
270}
271
272/// Splits a query [`Predicate`] into a [`ReaderFilterPlan`]: predicates that can run
273/// during the prefilter pass (on a reduced projection, to compute a refined row
274/// selection) versus predicates that must run on the normal read path (alongside the
275/// full projection).
276///
277/// The prefilter pass is *best-effort pruning*: a physical-filter predicate is silently
278/// dropped when [`PhysicalFilterContext::new_opt`] returns `None` (column not in the
279/// projected arrow schema). This is safe because the DataFusion `FilterExec` above the
280/// reader always re-applies the original predicate, so the prefilter pass is purely a
281/// pruning hint.
282///
283/// Tag and timestamp predicates that lower to [`SimpleFilterEvaluator`] are an
284/// exception — the engine enforces them precisely, so the prefilter pass is the only
285/// place they execute. They are never silently dropped.
286pub(crate) fn build_reader_filter_plan(
287    predicate: Option<&Predicate>,
288    expected_metadata: Option<&RegionMetadata>,
289    pre_filter_mode: PreFilterMode,
290    read_format: &FlatReadFormat,
291    codec: &Arc<dyn PrimaryKeyCodec>,
292) -> ReaderFilterPlan {
293    let Some(predicate) = predicate else {
294        return ReaderFilterPlan {
295            remaining_simple_filters: Vec::new(),
296            prefilter_builder: None,
297        };
298    };
299
300    let metadata = read_format.metadata();
301    let mut prefilter_simple_filters = Vec::new();
302    let mut remaining_simple_filters = Vec::new();
303    let mut prefilter_physical_filters = Vec::new();
304    let mut primary_key_filters = Vec::new();
305    let mut pk_filter_contexts = Vec::new();
306
307    // `SkipFields` keeps field predicates in the normal read path to avoid a
308    // second read of projected field columns, while tags/timestamp can still
309    // participate in prefiltering.
310    let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All;
311    // When true, tag columns are encoded in the primary key column and are NOT
312    // stored as separate parquet columns. Tag predicates must go through PK
313    // decoding rather than direct column reads.
314    let need_pk_prefilter = !read_format.batch_has_raw_pk_columns();
315
316    // Whether a column can be read directly from parquet for prefiltering,
317    // based on its semantic type and the current mode/format.
318    let can_direct_prefilter = |semantic_type: SemanticType| -> bool {
319        match semantic_type {
320            SemanticType::Tag => !need_pk_prefilter,
321            SemanticType::Field => field_prefilter_enabled,
322            SemanticType::Timestamp => true,
323        }
324    };
325
326    for expr in predicate.exprs() {
327        // Prefer cheap simple filters first. They also preserve `Matched` /
328        // `Pruned` states for columns that only exist in expected metadata.
329        if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) {
330            // `Matched` and `Pruned` come from expected-metadata compatibility
331            // and must stay in the main filter list so later phases keep that
332            // outcome.
333            let Some(filter) = filter_ctx.filter().as_filter() else {
334                remaining_simple_filters.push(filter_ctx);
335                continue;
336            };
337
338            // If the column is stored as a separate parquet column and is already projected in the main read,
339            // we can evaluate the simple filter directly during prefilter.
340            let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type());
341            if direct_prefilter {
342                assert!(
343                    read_format
344                        .arrow_schema()
345                        .column_with_name(filter.column_name())
346                        .is_some(),
347                    "Column '{}' is not present in the arrow schema {:?}",
348                    filter.column_name(),
349                    read_format.arrow_schema(),
350                );
351                prefilter_simple_filters.push(filter_ctx);
352                continue;
353            }
354
355            // Otherwise try to filter through encoded-PK matching.
356            if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag {
357                primary_key_filters.push(filter.clone());
358                pk_filter_contexts.push(filter_ctx);
359            } else {
360                remaining_simple_filters.push(filter_ctx);
361            }
362            continue;
363        }
364
365        // Best-effort physical-filter prefilter (see fn-level doc): `new_opt`
366        // returning `None` means the column is not in the projected arrow
367        // schema, and dropping the predicate is safe because the upper
368        // `FilterExec` re-applies it.
369        if let Some(filter) =
370            PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr)
371            && can_direct_prefilter(filter.semantic_type())
372        {
373            prefilter_physical_filters.push(filter);
374        }
375    }
376
377    let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| {
378        let mut expr_strs = pk_filter_contexts
379            .iter()
380            .map(|filter_ctx| filter_ctx.expr_str().to_string())
381            .collect::<Vec<_>>();
382        expr_strs.sort();
383        SmallVec::from_vec(expr_strs)
384    });
385    let pk_filter_exprs =
386        (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
387    let schema_version = expected_metadata
388        .map(|metadata| metadata.schema_version)
389        .unwrap_or_else(|| read_format.metadata().schema_version);
390    let prefilter_builder = PrefilterContextBuilder::new(
391        read_format,
392        codec,
393        pk_filter_exprs,
394        pk_filter_expr_strs,
395        prefilter_simple_filters.clone(),
396        prefilter_physical_filters,
397        schema_version,
398    );
399
400    if prefilter_builder.is_some() {
401        ReaderFilterPlan {
402            remaining_simple_filters,
403            prefilter_builder,
404        }
405    } else {
406        // If prefilter setup is not worthwhile, keep the original simple
407        // filters on the normal path so behavior is unchanged.
408        remaining_simple_filters.extend(prefilter_simple_filters);
409        remaining_simple_filters.extend(pk_filter_contexts);
410        ReaderFilterPlan {
411            remaining_simple_filters,
412            prefilter_builder: None,
413        }
414    }
415}
416
417/// Context for prefiltering a row group.
418pub(crate) struct PrefilterContext {
419    /// Optional PK filter for legacy primary-key-format parquet.
420    pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
421    /// Simple filters that can be evaluated directly from the prefilter batch.
422    filters: Vec<SimpleFilterContext>,
423    /// Physical filters that can be evaluated directly from the prefilter batch.
424    /// Physical expressions are only applied in the prefilter phase.
425    physical_filters: Vec<PhysicalFilterContext>,
426    /// Region schema version used in per-filter cache keys.
427    schema_version: u64,
428    /// Sorted expression strings for the encoded-PK filter group.
429    pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
430    /// Arrow schema used to build narrowed prefilter projections.
431    arrow_schema: SchemaRef,
432}
433
434/// Pre-built state for constructing [PrefilterContext] per row group.
435///
436/// Fields invariant across row groups (projection mask, codec, metadata, filters)
437/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
438/// is created via [PrefilterContextBuilder::build()] for each row group.
439pub(crate) struct PrefilterContextBuilder {
440    pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
441    pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
442    filters: Vec<SimpleFilterContext>,
443    physical_filters: Vec<PhysicalFilterContext>,
444    codec: Arc<dyn PrimaryKeyCodec>,
445    metadata: RegionMetadataRef,
446    schema_version: u64,
447    arrow_schema: SchemaRef,
448}
449
450impl PrefilterContextBuilder {
451    /// Creates a builder if prefiltering is applicable.
452    ///
453    /// Returns `None` if:
454    /// - The read format doesn't use flat layout
455    /// - No prefilter columns are selected
456    /// - Prefilter would read the full projection without any PK filter
457    pub(crate) fn new(
458        read_format: &FlatReadFormat,
459        codec: &Arc<dyn PrimaryKeyCodec>,
460        primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
461        primary_key_filter_expr_strs: Option<SmallVec<[String; 1]>>,
462        filters: Vec<SimpleFilterContext>,
463        physical_filters: Vec<PhysicalFilterContext>,
464        schema_version: u64,
465    ) -> Option<Self> {
466        let metadata = read_format.metadata();
467        let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
468        let pk_filters = (!use_raw_tag_columns)
469            .then_some(primary_key_filters)
470            .flatten()
471            .filter(|filters| !filters.is_empty());
472        let pk_filter_expr_strs = pk_filters
473            .is_some()
474            .then_some(primary_key_filter_expr_strs)
475            .flatten();
476
477        let mut prefilter_column_names = HashSet::new();
478        for filter_ctx in &filters {
479            if let MaybeFilter::Filter(filter) = filter_ctx.filter() {
480                prefilter_column_names.insert(filter.column_name().to_string());
481            }
482        }
483
484        if pk_filters.is_some() {
485            prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
486        }
487
488        for filter_ctx in &physical_filters {
489            prefilter_column_names.insert(filter_ctx.column_name().to_string());
490        }
491
492        let prefilter_count =
493            compute_projection_count(&prefilter_column_names, read_format.arrow_schema());
494
495        if prefilter_count == 0 {
496            return None;
497        }
498
499        let total_count = read_format.parquet_read_columns().root_indices().len();
500        let remaining_count = total_count.saturating_sub(prefilter_count);
501        if pk_filters.is_none() && prefilter_count >= total_count {
502            return None;
503        }
504
505        if pk_filters.is_none()
506            && !should_use_prefilter(prefilter_count, remaining_count, total_count)
507        {
508            return None;
509        }
510
511        Some(Self {
512            pk_filters,
513            pk_filter_expr_strs,
514            filters,
515            physical_filters,
516            codec: Arc::clone(codec),
517            metadata: metadata.clone(),
518            schema_version,
519            arrow_schema: read_format.arrow_schema().clone(),
520        })
521    }
522
523    /// Builds a [PrefilterContext] for a specific row group.
524    pub(crate) fn build(&self) -> PrefilterContext {
525        let pk_filter = self.pk_filters.as_ref().map(|pk_filters| {
526            let pk_filter = self
527                .codec
528                .primary_key_filter(&self.metadata, Arc::clone(pk_filters));
529            Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
530        });
531        PrefilterContext {
532            pk_filter,
533            filters: self.filters.clone(),
534            physical_filters: self.physical_filters.clone(),
535            schema_version: self.schema_version,
536            pk_filter_expr_strs: self.pk_filter_expr_strs.clone(),
537            arrow_schema: self.arrow_schema.clone(),
538        }
539    }
540}
541
542const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5;
543const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2;
544
545/// Result of prefiltering a row group.
546pub(crate) struct PrefilterResult {
547    /// Refined row selection after prefiltering.
548    pub(crate) refined_selection: RowSelection,
549    /// Number of rows filtered out by prefiltering.
550    pub(crate) filtered_rows: usize,
551}
552
553/// Executes prefiltering on a row group.
554///
555/// Reads only the prefilter columns (currently the PK dictionary column),
556/// applies filters, and returns a refined [RowSelection].
557fn compute_projection_mask(
558    column_names: &HashSet<String>,
559    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
560    parquet_schema: &SchemaDescriptor,
561) -> ProjectionMask {
562    ProjectionMask::roots(
563        parquet_schema,
564        projection_indices(column_names, arrow_schema),
565    )
566}
567
568fn compute_projection_count(
569    column_names: &HashSet<String>,
570    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
571) -> usize {
572    projection_indices(column_names, arrow_schema).len()
573}
574
575fn projection_indices(
576    column_names: &HashSet<String>,
577    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
578) -> Vec<usize> {
579    let mut projection_indices: Vec<usize> = column_names
580        .iter()
581        .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
582        .collect();
583    projection_indices.sort_unstable();
584    projection_indices.dedup();
585    projection_indices
586}
587
588fn should_use_prefilter(
589    prefilter_count: usize,
590    remaining_count: usize,
591    total_count: usize,
592) -> bool {
593    if remaining_count == 0 {
594        return false;
595    }
596
597    if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS {
598        return false;
599    }
600
601    let ratio = prefilter_count as f64 / total_count as f64;
602    ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD
603}
604
605pub(crate) async fn execute_prefilter(
606    prefilter_ctx: &mut PrefilterContext,
607    reader_builder: &RowGroupReaderBuilder,
608    build_ctx: &RowGroupBuildContext<'_>,
609) -> Result<PrefilterResult> {
610    let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx);
611
612    if entries.is_empty() {
613        return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx)
614            .await;
615    }
616
617    execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await
618}
619
620async fn execute_prefilter_with_result_cache(
621    prefilter_ctx: &mut PrefilterContext,
622    reader_builder: &RowGroupReaderBuilder,
623    build_ctx: &RowGroupBuildContext<'_>,
624    entries: Vec<PrefilterEntry>,
625) -> Result<PrefilterResult> {
626    let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx);
627    let mut hit_mask: Option<BooleanBuffer> = None;
628    let mut misses = Vec::new();
629    for entry in entries {
630        let Some(key) = &entry.key else {
631            misses.push(entry);
632            continue;
633        };
634
635        if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) {
636            hit_mask = Some(match hit_mask {
637                Some(hit_mask) => hit_mask.bitand(mask.as_ref()),
638                None => mask.as_ref().clone(),
639            });
640        } else {
641            misses.push(entry);
642        }
643    }
644
645    if misses.is_empty() && non_cacheable_physical.is_empty() {
646        let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0));
647        let refined_selection =
648            refined_selection_from_mask(&combined_mask, &build_ctx.row_selection);
649        let rows_before_filter = rows_before_filter(reader_builder, build_ctx);
650        let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count());
651        return Ok(PrefilterResult {
652            refined_selection,
653            filtered_rows,
654        });
655    }
656
657    let mut uncached_entries = misses;
658    uncached_entries.extend(
659        non_cacheable_physical
660            .iter()
661            .copied()
662            .map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
663    );
664    let (uncached_mask, read_rows) =
665        build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?;
666
667    let final_mask = match (hit_mask, uncached_mask) {
668        (Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask),
669        (Some(hit_mask), None) => hit_mask,
670        (None, Some(uncached_mask)) => uncached_mask,
671        (None, None) => BooleanBuffer::new_set(read_rows),
672    };
673    debug_assert_eq!(final_mask.len(), read_rows);
674    let rows_selected = final_mask.count_set_bits();
675    let filtered_rows = read_rows.saturating_sub(rows_selected);
676    let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
677
678    Ok(PrefilterResult {
679        refined_selection,
680        filtered_rows,
681    })
682}
683
684fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec<usize> {
685    prefilter_ctx
686        .physical_filters
687        .iter()
688        .enumerate()
689        .filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx))
690        .collect()
691}
692
693async fn build_prefilter_masks(
694    prefilter_ctx: &mut PrefilterContext,
695    reader_builder: &RowGroupReaderBuilder,
696    build_ctx: &RowGroupBuildContext<'_>,
697    entries: &[PrefilterEntry],
698) -> Result<(Option<BooleanBuffer>, usize)> {
699    let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries);
700    let parquet_schema = reader_builder
701        .parquet_metadata()
702        .file_metadata()
703        .schema_descr();
704    let projection = compute_projection_mask(
705        &prefilter_column_names,
706        &prefilter_ctx.arrow_schema,
707        parquet_schema,
708    );
709
710    let mut stream = reader_builder
711        .build_with_projection(
712            build_ctx.row_group_idx,
713            build_ctx.row_selection.clone(),
714            projection,
715            build_ctx.fetch_metrics,
716        )
717        .await?;
718
719    let mut cache_builders = entries
720        .iter()
721        .map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0)))
722        .collect::<Vec<_>>();
723    let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0));
724    let mut rows_before_filter = 0usize;
725
726    while let Some(batch_result) = stream.next().await {
727        let batch = batch_result?;
728        let num_rows = batch.num_rows();
729        if num_rows == 0 {
730            continue;
731        }
732        rows_before_filter += num_rows;
733
734        let mut batch_mask = BooleanBuffer::new_set(num_rows);
735        for (idx, entry) in entries.iter().enumerate() {
736            let mask = eval_entry_mask(
737                &batch,
738                prefilter_ctx,
739                entry.kind,
740                reader_builder.file_path(),
741            )?;
742            batch_mask = batch_mask.bitand(&mask);
743            if let Some(Some(builder)) = cache_builders.get_mut(idx) {
744                builder.append_buffer(&mask);
745            }
746        }
747        if let Some(builder) = &mut combined_builder {
748            builder.append_buffer(&batch_mask);
749        }
750    }
751
752    for (entry, builder) in entries.iter().zip(cache_builders) {
753        if let (Some(key), Some(mut builder)) = (&entry.key, builder) {
754            reader_builder
755                .cache_strategy()
756                .put_prefilter_result(key.clone(), Arc::new(builder.finish()));
757        }
758    }
759
760    Ok((
761        combined_builder.map(|mut builder| builder.finish()),
762        rows_before_filter,
763    ))
764}
765
766fn prefilter_column_names_for_entries(
767    prefilter_ctx: &PrefilterContext,
768    entries: &[PrefilterEntry],
769) -> HashSet<String> {
770    let mut prefilter_column_names = HashSet::new();
771    for entry in entries {
772        match entry.kind {
773            PrefilterEntryKind::Simple(idx) => {
774                if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() {
775                    prefilter_column_names.insert(filter.column_name().to_string());
776                }
777            }
778            PrefilterEntryKind::Physical(idx) => {
779                prefilter_column_names.insert(
780                    prefilter_ctx.physical_filters[idx]
781                        .column_name()
782                        .to_string(),
783                );
784            }
785            PrefilterEntryKind::PkGroup => {
786                prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
787            }
788        }
789    }
790    prefilter_column_names
791}
792
793async fn execute_prefilter_by_reading_columns(
794    prefilter_ctx: &mut PrefilterContext,
795    reader_builder: &RowGroupReaderBuilder,
796    build_ctx: &RowGroupBuildContext<'_>,
797) -> Result<PrefilterResult> {
798    let entries = all_prefilter_entries(prefilter_ctx);
799    let (mask, rows_before_filter) =
800        build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?;
801
802    let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter));
803    let rows_selected = final_mask.count_set_bits();
804    let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
805    let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
806
807    Ok(PrefilterResult {
808        refined_selection,
809        filtered_rows,
810    })
811}
812
813fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec<PrefilterEntry> {
814    let mut entries = Vec::new();
815    if prefilter_ctx.pk_filter.is_some() {
816        entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup));
817    }
818    entries.extend(
819        prefilter_ctx
820            .filters
821            .iter()
822            .enumerate()
823            .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))),
824    );
825    entries.extend(
826        prefilter_ctx
827            .physical_filters
828            .iter()
829            .enumerate()
830            .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
831    );
832    entries
833}
834
835#[derive(Clone, Copy)]
836enum PrefilterEntryKind {
837    Simple(usize),
838    Physical(usize),
839    PkGroup,
840}
841
842struct PrefilterEntry {
843    kind: PrefilterEntryKind,
844    key: Option<PrefilterKey>,
845}
846
847impl PrefilterEntry {
848    fn without_cache(kind: PrefilterEntryKind) -> Self {
849        Self { kind, key: None }
850    }
851}
852
853fn build_prefilter_cache_entries(
854    prefilter_ctx: &PrefilterContext,
855    reader_builder: &RowGroupReaderBuilder,
856    build_ctx: &RowGroupBuildContext<'_>,
857) -> Vec<PrefilterEntry> {
858    let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref());
859    let file_id = reader_builder.file_handle().file_id().file_id();
860    let row_group_idx = build_ctx.row_group_idx as u32;
861    let mut entries = Vec::new();
862
863    for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() {
864        entries.push(PrefilterEntry {
865            kind: PrefilterEntryKind::Simple(idx),
866            key: Some(PrefilterKey::new(
867                file_id,
868                row_group_idx,
869                row_selection.clone(),
870                prefilter_ctx.schema_version,
871                smallvec![filter_ctx.expr_str().to_string()],
872            )),
873        });
874    }
875
876    for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() {
877        if !filter_ctx.is_immutable() {
878            continue;
879        }
880        entries.push(PrefilterEntry {
881            kind: PrefilterEntryKind::Physical(idx),
882            key: Some(PrefilterKey::new(
883                file_id,
884                row_group_idx,
885                row_selection.clone(),
886                prefilter_ctx.schema_version,
887                smallvec![filter_ctx.expr_str().to_string()],
888            )),
889        });
890    }
891
892    if prefilter_ctx.pk_filter.is_some()
893        && let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs
894    {
895        entries.push(PrefilterEntry {
896            kind: PrefilterEntryKind::PkGroup,
897            key: Some(PrefilterKey::new(
898                file_id,
899                row_group_idx,
900                row_selection,
901                prefilter_ctx.schema_version,
902                exprs.clone(),
903            )),
904        });
905    }
906
907    entries
908}
909
910fn rows_before_filter(
911    reader_builder: &RowGroupReaderBuilder,
912    build_ctx: &RowGroupBuildContext<'_>,
913) -> usize {
914    build_ctx.row_selection.as_ref().map_or_else(
915        || {
916            reader_builder
917                .parquet_metadata()
918                .row_group(build_ctx.row_group_idx)
919                .num_rows() as usize
920        },
921        RowSelection::row_count,
922    )
923}
924
925fn refined_selection_from_mask(
926    mask: &BooleanBuffer,
927    original_selection: &Option<RowSelection>,
928) -> RowSelection {
929    if mask.is_empty() || mask.count_set_bits() == 0 {
930        return RowSelection::from(vec![]);
931    }
932
933    let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]);
934    match original_selection {
935        Some(original) => original.and_then(&prefilter_selection),
936        None => prefilter_selection,
937    }
938}
939
940fn eval_entry_mask(
941    batch: &RecordBatch,
942    prefilter_ctx: &mut PrefilterContext,
943    kind: PrefilterEntryKind,
944    file_path: &str,
945) -> Result<BooleanBuffer> {
946    match kind {
947        PrefilterEntryKind::Simple(idx) => {
948            eval_simple_filter_mask(batch, &prefilter_ctx.filters[idx], file_path)
949        }
950        PrefilterEntryKind::Physical(idx) => {
951            eval_physical_filter_mask(batch, &prefilter_ctx.physical_filters[idx], file_path)
952        }
953        PrefilterEntryKind::PkGroup => {
954            let pk_filter = prefilter_ctx.pk_filter.as_mut().context(UnexpectedSnafu {
955                reason: "Missing primary key filter for prefilter cache entry",
956            })?;
957            eval_pk_group_mask(batch, pk_filter.as_mut())
958        }
959    }
960}
961
962fn eval_pk_group_mask(
963    batch: &RecordBatch,
964    pk_filter: &mut dyn PrimaryKeyFilter,
965) -> Result<BooleanBuffer> {
966    let (pk_column_index, _) = batch
967        .schema()
968        .column_with_name(PRIMARY_KEY_COLUMN_NAME)
969        .context(UnexpectedSnafu {
970            reason: "Primary key column not found in prefilter batch",
971        })?;
972    let matched_row_ranges = matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter)?;
973    let mut builder = BooleanBufferBuilder::new(batch.num_rows());
974    builder.append_n(batch.num_rows(), false);
975    for range in matched_row_ranges {
976        for row in range {
977            builder.set_bit(row, true);
978        }
979    }
980    Ok(builder.finish())
981}
982
983fn eval_simple_filter_mask(
984    batch: &RecordBatch,
985    filter_ctx: &SimpleFilterContext,
986    file_path: &str,
987) -> Result<BooleanBuffer> {
988    let filter = match filter_ctx.filter() {
989        MaybeFilter::Filter(filter) => filter,
990        MaybeFilter::Matched => return Ok(BooleanBuffer::new_set(batch.num_rows())),
991        MaybeFilter::Pruned => return Ok(BooleanBuffer::new_unset(batch.num_rows())),
992    };
993
994    let (idx, _) = batch
995        .schema()
996        .column_with_name(filter.column_name())
997        .with_context(|| UnexpectedSnafu {
998            reason: format!(
999                "Prefilter column '{}' (id {}) not found in batch for file {}",
1000                filter.column_name(),
1001                filter_ctx.column_id(),
1002                file_path
1003            ),
1004        })?;
1005    let column = batch.column(idx).clone();
1006    filter.evaluate_array(&column).context(RecordBatchSnafu)
1007}
1008
1009fn eval_physical_filter_mask(
1010    batch: &RecordBatch,
1011    filter_ctx: &PhysicalFilterContext,
1012    file_path: &str,
1013) -> Result<BooleanBuffer> {
1014    let filter = filter_ctx.filter();
1015
1016    let (idx, _) = batch
1017        .schema()
1018        .column_with_name(filter_ctx.column_name())
1019        .with_context(|| UnexpectedSnafu {
1020            reason: format!(
1021                "Prefilter physical column '{}' (id {}) not found in batch for file {}",
1022                filter_ctx.column_name(),
1023                filter_ctx.column_id(),
1024                file_path
1025            ),
1026        })?;
1027    let column = batch.column(idx).clone();
1028
1029    let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
1030        .context(NewRecordBatchSnafu)?;
1031    let evaluated = filter
1032        .evaluate(&record_batch)
1033        .context(EvalPartitionFilterSnafu)?;
1034    let array = evaluated
1035        .into_array(record_batch.num_rows())
1036        .context(EvalPartitionFilterSnafu)?;
1037    let boolean_array = array
1038        .as_any()
1039        .downcast_ref::<BooleanArray>()
1040        .context(UnexpectedSnafu {
1041            reason: "Failed to downcast physical filter result to BooleanArray",
1042        })?;
1043    // Treat null results as false (filtered out); value bits are not guaranteed
1044    // to be false for invalid entries.
1045    let mut result = boolean_array.values().clone();
1046    if let Some(nulls) = boolean_array.nulls() {
1047        result = result.bitand(nulls.inner());
1048    }
1049    Ok(result)
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054    use std::sync::Arc;
1055    use std::sync::atomic::{AtomicUsize, Ordering};
1056
1057    use common_recordbatch::filter::SimpleFilterEvaluator;
1058    use datafusion_expr::{col, lit};
1059    use datatypes::arrow::array::{
1060        ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
1061    };
1062    use datatypes::arrow::datatypes::{Schema, UInt32Type};
1063    use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
1064    use store_api::codec::PrimaryKeyEncoding;
1065
1066    use super::*;
1067    use crate::read::read_columns::ReadColumns;
1068    use crate::sst::internal_fields;
1069    use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
1070    use crate::test_util::sst_util::{
1071        new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata,
1072        sst_region_metadata_with_encoding,
1073    };
1074
1075    struct CountingPrimaryKeyFilter {
1076        hits: Arc<AtomicUsize>,
1077        expected: Vec<u8>,
1078    }
1079
1080    impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
1081        fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
1082            self.hits.fetch_add(1, Ordering::Relaxed);
1083            Ok(pk == self.expected.as_slice())
1084        }
1085    }
1086
1087    #[test]
1088    fn test_cached_primary_key_filter_reuses_previous_result() {
1089        let expected = new_primary_key(&["a", "x"]);
1090        let hits = Arc::new(AtomicUsize::new(0));
1091        let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
1092            hits: Arc::clone(&hits),
1093            expected: expected.clone(),
1094        }));
1095
1096        assert!(filter.matches(expected.as_slice()).unwrap());
1097        assert!(filter.matches(expected.as_slice()).unwrap());
1098        assert!(
1099            !filter
1100                .matches(new_primary_key(&["b", "x"]).as_slice())
1101                .unwrap()
1102        );
1103
1104        assert_eq!(hits.load(Ordering::Relaxed), 2);
1105    }
1106
1107    fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
1108        exprs
1109            .iter()
1110            .filter_map(SimpleFilterEvaluator::try_new)
1111            .collect()
1112    }
1113
1114    fn new_simple_filter_contexts(
1115        metadata: &RegionMetadataRef,
1116        exprs: &[datafusion_expr::Expr],
1117    ) -> Vec<SimpleFilterContext> {
1118        exprs
1119            .iter()
1120            .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
1121            .collect()
1122    }
1123
1124    fn new_physical_filter_contexts(
1125        metadata: &RegionMetadataRef,
1126        read_format: &FlatReadFormat,
1127        exprs: &[datafusion_expr::Expr],
1128    ) -> Vec<PhysicalFilterContext> {
1129        exprs
1130            .iter()
1131            .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr))
1132            .collect()
1133    }
1134
1135    fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1136        assert_eq!(primary_keys.len(), field_values.len());
1137
1138        let metadata = Arc::new(sst_region_metadata());
1139        let arrow_schema = metadata.schema.arrow_schema();
1140        let field_column = arrow_schema
1141            .field(arrow_schema.index_of("field_0").unwrap())
1142            .clone();
1143        let time_index_column = arrow_schema
1144            .field(arrow_schema.index_of("ts").unwrap())
1145            .clone();
1146        let mut fields = vec![field_column, time_index_column];
1147        fields.extend(
1148            internal_fields()
1149                .into_iter()
1150                .map(|field| field.as_ref().clone()),
1151        );
1152        let schema = Arc::new(Schema::new(fields));
1153
1154        let mut dict_values = Vec::new();
1155        let mut keys = Vec::with_capacity(primary_keys.len());
1156        for pk in primary_keys {
1157            let key = dict_values
1158                .iter()
1159                .position(|existing: &&[u8]| existing == pk)
1160                .unwrap_or_else(|| {
1161                    dict_values.push(*pk);
1162                    dict_values.len() - 1
1163                });
1164            keys.push(key as u32);
1165        }
1166        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1167            UInt32Array::from(keys),
1168            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1169        ));
1170
1171        RecordBatch::try_new(
1172            schema,
1173            vec![
1174                Arc::new(UInt64Array::from(field_values.to_vec())),
1175                Arc::new(TimestampMillisecondArray::from_iter_values(
1176                    0..primary_keys.len() as i64,
1177                )),
1178                pk_array,
1179                Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
1180                Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
1181            ],
1182        )
1183        .unwrap()
1184    }
1185
1186    fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1187        assert_eq!(primary_keys.len(), field_values.len());
1188
1189        let metadata = Arc::new(sst_region_metadata());
1190        let arrow_schema = metadata.schema.arrow_schema();
1191        let field_column = arrow_schema
1192            .field(arrow_schema.index_of("field_0").unwrap())
1193            .clone();
1194        let time_index_column = arrow_schema
1195            .field(arrow_schema.index_of("ts").unwrap())
1196            .clone();
1197        let schema = Arc::new(Schema::new(vec![
1198            field_column,
1199            time_index_column,
1200            internal_fields()[0].as_ref().clone(),
1201        ]));
1202
1203        let mut dict_values = Vec::new();
1204        let mut keys = Vec::with_capacity(primary_keys.len());
1205        for pk in primary_keys {
1206            let key = dict_values
1207                .iter()
1208                .position(|existing: &&[u8]| existing == pk)
1209                .unwrap_or_else(|| {
1210                    dict_values.push(*pk);
1211                    dict_values.len() - 1
1212                });
1213            keys.push(key as u32);
1214        }
1215        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1216            UInt32Array::from(keys),
1217            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1218        ));
1219
1220        RecordBatch::try_new(
1221            schema,
1222            vec![
1223                Arc::new(UInt64Array::from(field_values.to_vec())),
1224                Arc::new(TimestampMillisecondArray::from_iter_values(
1225                    0..primary_keys.len() as i64,
1226                )),
1227                pk_array,
1228            ],
1229        )
1230        .unwrap()
1231    }
1232
1233    fn field_values(batch: &RecordBatch) -> Vec<u64> {
1234        batch
1235            .column(0)
1236            .as_any()
1237            .downcast_ref::<UInt64Array>()
1238            .unwrap()
1239            .values()
1240            .to_vec()
1241    }
1242
1243    fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> {
1244        filters
1245            .iter()
1246            .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name())
1247            .collect()
1248    }
1249
1250    #[test]
1251    fn test_prefilter_primary_key_drops_single_dictionary_batch() {
1252        let metadata = Arc::new(sst_region_metadata());
1253        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
1254        let mut primary_key_filter =
1255            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1256        let pk_a = new_primary_key(&["a", "x"]);
1257        let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
1258        let pk_col_idx = primary_key_column_index(batch.num_columns());
1259
1260        let filtered =
1261            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1262                .unwrap();
1263
1264        assert!(filtered.is_none());
1265    }
1266
1267    #[test]
1268    fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
1269        let metadata = Arc::new(sst_region_metadata());
1270        let filters = Arc::new(new_test_filters(&[col("tag_0")
1271            .eq(lit("a"))
1272            .or(col("tag_0").eq(lit("c")))]));
1273        let mut primary_key_filter =
1274            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1275        let pk_a = new_primary_key(&["a", "x"]);
1276        let pk_b = new_primary_key(&["b", "x"]);
1277        let pk_c = new_primary_key(&["c", "x"]);
1278        let pk_d = new_primary_key(&["d", "x"]);
1279        let batch = new_raw_batch(
1280            &[
1281                pk_a.as_slice(),
1282                pk_a.as_slice(),
1283                pk_b.as_slice(),
1284                pk_b.as_slice(),
1285                pk_c.as_slice(),
1286                pk_c.as_slice(),
1287                pk_d.as_slice(),
1288                pk_d.as_slice(),
1289            ],
1290            &[10, 11, 12, 13, 14, 15, 16, 17],
1291        );
1292        let pk_col_idx = primary_key_column_index(batch.num_columns());
1293
1294        let filtered =
1295            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1296                .unwrap()
1297                .unwrap();
1298
1299        assert_eq!(filtered.num_rows(), 4);
1300        assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
1301    }
1302
1303    #[test]
1304    fn test_prefilter_builder_returns_none_without_selected_filters() {
1305        let metadata: RegionMetadataRef =
1306            Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1307        let read_format = FlatReadFormat::new(
1308            metadata.clone(),
1309            ReadColumns::from_deduped_column_ids(
1310                metadata.column_metadatas.iter().map(|c| c.column_id),
1311            ),
1312            None,
1313            "test",
1314            false,
1315        )
1316        .unwrap();
1317        let codec = build_primary_key_codec(metadata.as_ref());
1318
1319        let builder = PrefilterContextBuilder::new(
1320            &read_format,
1321            &codec,
1322            None,
1323            None,
1324            Vec::new(),
1325            Vec::new(),
1326            metadata.schema_version,
1327        );
1328        assert!(builder.is_none());
1329    }
1330
1331    #[test]
1332    fn test_should_use_prefilter() {
1333        assert!(should_use_prefilter(1, 5, 6));
1334        assert!(!should_use_prefilter(1, 0, 1));
1335        assert!(!should_use_prefilter(1, 1, 2));
1336        assert!(!should_use_prefilter(4, 3, 7));
1337        assert!(should_use_prefilter(3, 3, 6));
1338    }
1339
1340    #[test]
1341    fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() {
1342        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1343            PrimaryKeyEncoding::Sparse,
1344        ));
1345        let legacy_read_format = FlatReadFormat::new(
1346            metadata.clone(),
1347            ReadColumns::from_deduped_column_ids(
1348                metadata.column_metadatas.iter().map(|c| c.column_id),
1349            ),
1350            None,
1351            "memtable",
1352            false,
1353        )
1354        .unwrap();
1355        assert!(!legacy_read_format.batch_has_raw_pk_columns());
1356
1357        let plan = build_bulk_filter_plan(
1358            &legacy_read_format,
1359            Some(&Predicate::new(vec![
1360                col("tag_0").eq(lit("a")),
1361                col("field_0").gt(lit(1_u64)),
1362            ])),
1363        );
1364        assert_eq!(
1365            plan.pk_filters.as_ref().map(|filters| filters.len()),
1366            Some(1)
1367        );
1368        assert_eq!(
1369            remaining_simple_filter_columns(&plan.remaining_simple_filters),
1370            vec!["field_0"]
1371        );
1372
1373        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1374        let raw_pk_read_format = FlatReadFormat::new(
1375            metadata.clone(),
1376            ReadColumns::from_deduped_column_ids(
1377                metadata.column_metadatas.iter().map(|c| c.column_id),
1378            ),
1379            None,
1380            "memtable",
1381            true,
1382        )
1383        .unwrap();
1384        assert!(raw_pk_read_format.batch_has_raw_pk_columns());
1385
1386        let tag_only_plan = build_bulk_filter_plan(
1387            &raw_pk_read_format,
1388            Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1389        );
1390        assert!(tag_only_plan.pk_filters.is_none());
1391        assert_eq!(
1392            remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters),
1393            vec!["tag_0"]
1394        );
1395
1396        let field_only_plan = build_bulk_filter_plan(
1397            &raw_pk_read_format,
1398            Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])),
1399        );
1400        assert!(field_only_plan.pk_filters.is_none());
1401        assert_eq!(
1402            remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters),
1403            vec!["field_0"]
1404        );
1405    }
1406
1407    #[test]
1408    fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() {
1409        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1410        let full_read_format = FlatReadFormat::new(
1411            metadata.clone(),
1412            ReadColumns::from_deduped_column_ids(
1413                metadata.column_metadatas.iter().map(|c| c.column_id),
1414            ),
1415            None,
1416            "test",
1417            true,
1418        )
1419        .unwrap();
1420        let codec = build_primary_key_codec(metadata.as_ref());
1421
1422        let skip_fields_plan = build_reader_filter_plan(
1423            Some(&Predicate::new(vec![
1424                col("tag_0").eq(lit("a")),
1425                col("field_0").gt(lit(1_u64)),
1426            ])),
1427            None,
1428            PreFilterMode::SkipFields,
1429            &full_read_format,
1430            &codec,
1431        );
1432        assert!(skip_fields_plan.prefilter_builder.is_some());
1433        assert_eq!(
1434            remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters),
1435            vec!["field_0"]
1436        );
1437
1438        let field_0 = metadata.column_by_name("field_0").unwrap().column_id;
1439        let ts = metadata.time_index_column().column_id;
1440        let projected_read_format = FlatReadFormat::new(
1441            metadata.clone(),
1442            ReadColumns::from_deduped_column_ids([field_0, ts]),
1443            None,
1444            "test",
1445            true,
1446        )
1447        .unwrap();
1448        let pk_prefilter_plan = build_reader_filter_plan(
1449            Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1450            None,
1451            PreFilterMode::All,
1452            &projected_read_format,
1453            &codec,
1454        );
1455        assert!(pk_prefilter_plan.prefilter_builder.is_some());
1456        assert!(pk_prefilter_plan.remaining_simple_filters.is_empty());
1457    }
1458
1459    #[test]
1460    fn test_pk_filter_expr_strings_are_stable_under_expr_order() {
1461        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1462            PrimaryKeyEncoding::Sparse,
1463        ));
1464        let read_format = FlatReadFormat::new(
1465            metadata.clone(),
1466            ReadColumns::from_deduped_column_ids(
1467                metadata.column_metadatas.iter().map(|c| c.column_id),
1468            ),
1469            None,
1470            "test",
1471            false,
1472        )
1473        .unwrap();
1474        let codec = build_primary_key_codec(metadata.as_ref());
1475
1476        let expr_a = col("tag_0").eq(lit("a"));
1477        let expr_b = col("tag_1").eq(lit("x"));
1478        let plan_ab = build_reader_filter_plan(
1479            Some(&Predicate::new(vec![expr_a.clone(), expr_b.clone()])),
1480            None,
1481            PreFilterMode::All,
1482            &read_format,
1483            &codec,
1484        );
1485        let plan_b_a = build_reader_filter_plan(
1486            Some(&Predicate::new(vec![expr_b, expr_a])),
1487            None,
1488            PreFilterMode::All,
1489            &read_format,
1490            &codec,
1491        );
1492
1493        let exprs_ab = plan_ab.prefilter_builder.unwrap().pk_filter_expr_strs;
1494        let exprs_b_a = plan_b_a.prefilter_builder.unwrap().pk_filter_expr_strs;
1495        assert!(exprs_ab.is_some());
1496        assert_eq!(exprs_ab, exprs_b_a);
1497    }
1498
1499    #[test]
1500    fn test_simple_and_physical_contexts_preserve_expr_strings() {
1501        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1502        let read_format = FlatReadFormat::new(
1503            metadata.clone(),
1504            ReadColumns::from_deduped_column_ids(
1505                metadata.column_metadatas.iter().map(|c| c.column_id),
1506            ),
1507            None,
1508            "test",
1509            true,
1510        )
1511        .unwrap();
1512
1513        let simple_expr = col("tag_0").eq(lit("a"));
1514        let simple = SimpleFilterContext::new_opt(&metadata, None, &simple_expr).unwrap();
1515        assert_eq!(simple.expr_str(), format!("{simple_expr:?}"));
1516
1517        let physical_expr = col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false);
1518        let physical =
1519            PhysicalFilterContext::new_opt(&metadata, None, &read_format, &physical_expr).unwrap();
1520        assert_eq!(physical.expr_str(), format!("{physical_expr:?}"));
1521    }
1522
1523    #[test]
1524    fn test_eval_simple_filter_mask_uses_flat_tag_columns_directly() {
1525        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1526        let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1527        let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
1528
1529        let mask = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap();
1530        assert_eq!(mask.count_set_bits(), 4);
1531    }
1532
1533    #[test]
1534    fn test_eval_simple_filter_mask_errors_on_missing_selected_column() {
1535        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1536        let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1537        let pk = new_primary_key(&["a", "x"]);
1538        let batch = new_raw_batch(&[pk.as_slice()], &[10]);
1539
1540        let err = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap_err();
1541        let err = err.to_string();
1542        assert!(err.contains("Prefilter column"));
1543        assert!(err.contains("tag_0"));
1544    }
1545
1546    #[test]
1547    fn test_eval_physical_filter_mask_evaluates_physical_filters() {
1548        let metadata: RegionMetadataRef =
1549            Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1550        let read_format = FlatReadFormat::new(
1551            metadata.clone(),
1552            ReadColumns::from_deduped_column_ids(
1553                metadata.column_metadatas.iter().map(|c| c.column_id),
1554            ),
1555            None,
1556            "test",
1557            false,
1558        )
1559        .unwrap();
1560        let expr = col("field_0").in_list(vec![lit(11_u64)], false);
1561        let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]);
1562        let pk = new_primary_key(&["a", "x"]);
1563        let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
1564
1565        let mask = eval_physical_filter_mask(&batch, &physical_filters[0], "test").unwrap();
1566        assert_eq!(mask.count_set_bits(), 1);
1567    }
1568
1569    #[test]
1570    fn test_eval_pk_group_mask_finds_pk_column_by_name() {
1571        let metadata = Arc::new(sst_region_metadata());
1572        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1573        let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1574            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters),
1575        )) as Box<dyn PrimaryKeyFilter>);
1576        let pk_a = new_primary_key(&["a", "x"]);
1577        let pk_b = new_primary_key(&["b", "x"]);
1578        let batch = new_prefilter_batch(
1579            &[
1580                pk_a.as_slice(),
1581                pk_a.as_slice(),
1582                pk_b.as_slice(),
1583                pk_b.as_slice(),
1584            ],
1585            &[10, 11, 12, 13],
1586        );
1587
1588        let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
1589
1590        assert_eq!(mask.count_set_bits(), 2);
1591    }
1592}