Skip to main content

mito2/sst/parquet/
prefilter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prefilter framework for parquet reader.
16//!
17//! Prefilter optimization reduces I/O by reading only a subset of columns first
18//! (the prefilter phase), applying filters to compute a refined row selection,
19//! then reading the remaining columns with the refined selection.
20
21use std::collections::HashSet;
22use std::ops::{BitAnd, Range};
23use std::sync::Arc;
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::datatypes::SchemaRef;
30use datatypes::arrow::record_batch::RecordBatch;
31use futures::StreamExt;
32use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
33use parquet::arrow::ProjectionMask;
34use parquet::arrow::arrow_reader::RowSelection;
35use parquet::schema::types::SchemaDescriptor;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
40use table::predicate::Predicate;
41
42use crate::cache::PrefilterKey;
43use crate::error::{
44    ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
45    RecordBatchSnafu, Result, UnexpectedSnafu,
46};
47use crate::sst::parquet::file_range::PreFilterMode;
48use crate::sst::parquet::flat_format::FlatReadFormat;
49use crate::sst::parquet::format::PrimaryKeyArray;
50use crate::sst::parquet::reader::{
51    MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder,
52    SimpleFilterContext,
53};
54
55pub(crate) fn matching_row_ranges_by_primary_key(
56    input: &RecordBatch,
57    pk_column_index: usize,
58    pk_filter: &mut dyn PrimaryKeyFilter,
59) -> Result<Vec<Range<usize>>> {
60    let pk_column = input.column(pk_column_index);
61    if let Some(pk_dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
62        matching_row_ranges_from_dict(pk_dict_array, input.num_rows(), pk_filter)
63    } else if let Some(pk_binary_array) = pk_column.as_any().downcast_ref::<BinaryArray>() {
64        matching_row_ranges_from_binary(pk_binary_array, input.num_rows(), pk_filter)
65    } else {
66        UnexpectedSnafu {
67            reason: format!(
68                "Primary key column is neither a dictionary nor a binary array, got {:?}",
69                pk_column.data_type()
70            ),
71        }
72        .fail()
73    }
74}
75
76/// If `pk_filter` matches `pk`, records the row range `start..end`, coalescing
77/// it with the previous range when adjacent.
78fn push_matched_range(
79    matched_row_ranges: &mut Vec<Range<usize>>,
80    pk_filter: &mut dyn PrimaryKeyFilter,
81    pk: &[u8],
82    start: usize,
83    end: usize,
84) -> Result<()> {
85    if pk_filter.matches(pk).context(DecodeSnafu)? {
86        if let Some(last) = matched_row_ranges.last_mut()
87            && last.end == start
88        {
89            last.end = end;
90        } else {
91            matched_row_ranges.push(start..end);
92        }
93    }
94    Ok(())
95}
96
97/// Computes matched row ranges from a dictionary-encoded `__primary_key` column.
98fn matching_row_ranges_from_dict(
99    pk_dict_array: &PrimaryKeyArray,
100    num_rows: usize,
101    pk_filter: &mut dyn PrimaryKeyFilter,
102) -> Result<Vec<Range<usize>>> {
103    let pk_values = pk_dict_array
104        .values()
105        .as_any()
106        .downcast_ref::<BinaryArray>()
107        .context(UnexpectedSnafu {
108            reason: "Primary key values are not binary array",
109        })?;
110    let keys = pk_dict_array.keys();
111    let key_values = keys.values();
112
113    if key_values.is_empty() {
114        return Ok(std::iter::once(0..num_rows).collect());
115    }
116
117    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
118    let mut start = 0;
119    while start < key_values.len() {
120        let key = key_values[start];
121        let mut end = start + 1;
122        while end < key_values.len() && key_values[end] == key {
123            end += 1;
124        }
125
126        push_matched_range(
127            &mut matched_row_ranges,
128            pk_filter,
129            pk_values.value(key as usize),
130            start,
131            end,
132        )?;
133
134        start = end;
135    }
136
137    Ok(matched_row_ranges)
138}
139
140/// Computes matched row ranges from a plain binary `__primary_key` column.
141///
142/// The writer falls back to plain binary encoding when the `__primary_key`
143/// chunk exceeds the dictionary page size limit (see `should_read_pk_as_binary`
144/// in the parquet reader), so the prefilter pass must handle this form too.
145fn matching_row_ranges_from_binary(
146    pk_array: &BinaryArray,
147    num_rows: usize,
148    pk_filter: &mut dyn PrimaryKeyFilter,
149) -> Result<Vec<Range<usize>>> {
150    if pk_array.is_empty() {
151        return Ok(std::iter::once(0..num_rows).collect());
152    }
153
154    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
155    let mut start = 0;
156    while start < pk_array.len() {
157        let value = pk_array.value(start);
158        let mut end = start + 1;
159        while end < pk_array.len() && pk_array.value(end) == value {
160            end += 1;
161        }
162
163        push_matched_range(&mut matched_row_ranges, pk_filter, value, start, end)?;
164
165        start = end;
166    }
167
168    Ok(matched_row_ranges)
169}
170
171/// Filters a flat-format record batch by primary key, returning only rows whose
172/// primary key matches the filter. Returns `None` if all rows are filtered out.
173pub(crate) fn prefilter_flat_batch_by_primary_key(
174    input: RecordBatch,
175    pk_column_index: usize,
176    pk_filter: &mut dyn PrimaryKeyFilter,
177) -> Result<Option<RecordBatch>> {
178    if input.num_rows() == 0 {
179        return Ok(Some(input));
180    }
181
182    let matched_row_ranges =
183        matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
184    if matched_row_ranges.is_empty() {
185        return Ok(None);
186    }
187
188    if matched_row_ranges.len() == 1
189        && matched_row_ranges[0].start == 0
190        && matched_row_ranges[0].end == input.num_rows()
191    {
192        return Ok(Some(input));
193    }
194
195    if matched_row_ranges.len() == 1 {
196        let span = &matched_row_ranges[0];
197        return Ok(Some(input.slice(span.start, span.end - span.start)));
198    }
199
200    let mut builder = BooleanBufferBuilder::new(input.num_rows());
201    builder.append_n(input.num_rows(), false);
202    for span in matched_row_ranges {
203        for i in span {
204            builder.set_bit(i, true);
205        }
206    }
207
208    let filtered = datatypes::arrow::compute::filter_record_batch(
209        &input,
210        &BooleanArray::new(builder.finish(), None),
211    )
212    .context(ComputeArrowSnafu)?;
213    if filtered.num_rows() == 0 {
214        Ok(None)
215    } else {
216        Ok(Some(filtered))
217    }
218}
219
220pub(crate) struct CachedPrimaryKeyFilter {
221    inner: Box<dyn PrimaryKeyFilter>,
222    last_primary_key: Vec<u8>,
223    last_match: Option<bool>,
224}
225
226impl CachedPrimaryKeyFilter {
227    pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
228        Self {
229            inner,
230            last_primary_key: Vec::new(),
231            last_match: None,
232        }
233    }
234}
235
236impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
237    fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
238        if let Some(last_match) = self.last_match
239            && self.last_primary_key == pk
240        {
241            return Ok(last_match);
242        }
243
244        let matched = self.inner.matches(pk)?;
245        self.last_primary_key.clear();
246        self.last_primary_key.extend_from_slice(pk);
247        self.last_match = Some(matched);
248        Ok(matched)
249    }
250}
251
252/// How the bulk-memtable read should apply each predicate.
253///
254/// Unlike the parquet reader, the bulk path has no prefilter pass; predicates
255/// either run row-wise inside the iterator or are pushed down to encoded-PK
256/// matching when the batch still carries the primary-key column.
257pub(crate) struct BulkFilterPlan {
258    /// Simple filters the iterator still has to evaluate row-wise on each batch.
259    pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
260    /// Tag predicates lowered to encoded-PK filters. `None` when the batch
261    /// already exposes raw tag columns or there are no tag predicates.
262    pub(crate) pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
263}
264
265/// How the parquet reader should apply each predicate.
266///
267/// The reader runs in two phases. Predicates routed into `prefilter_builder`
268/// execute on a reduced column set first to compute a refined row selection;
269/// `remaining_simple_filters` execute alongside the full projection on the
270/// normal read path. The contract for what is precise vs best-effort is
271/// documented on [`build_reader_filter_plan`].
272pub(crate) struct ReaderFilterPlan {
273    /// Simple filters that must run on the normal read path: predicates with
274    /// `Matched` / `Pruned` outcomes (which carry expected-metadata
275    /// compatibility decisions later phases rely on), and predicates whose
276    /// column cannot be read directly during the prefilter pass.
277    pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
278    /// Pre-built state for the prefilter pass, or `None` when prefiltering is
279    /// not worthwhile (no prefilter columns selected, or the prefilter
280    /// projection would cover nearly the full read).
281    pub(crate) prefilter_builder: Option<PrefilterContextBuilder>,
282}
283
284pub(crate) fn build_bulk_filter_plan(
285    read_format: &FlatReadFormat,
286    predicate: Option<&Predicate>,
287) -> BulkFilterPlan {
288    let metadata = read_format.metadata();
289    // Bulk memtable only needs simple binary filters here. Any filter that
290    // cannot be reduced to a SimpleFilterContext stays out of this fast path.
291    let simple_filters: Vec<SimpleFilterContext> = predicate
292        .into_iter()
293        .flat_map(|predicate| {
294            predicate
295                .exprs()
296                .iter()
297                .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
298        })
299        .collect();
300
301    // PK prefilter only works when flat batches still carry the encoded PK
302    // column. If tags have already been expanded to raw columns, the iterator
303    // can apply those filters directly and there is nothing to extract here.
304    if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() {
305        return BulkFilterPlan {
306            remaining_simple_filters: simple_filters,
307            pk_filters: None,
308        };
309    }
310
311    let mut remaining_simple_filters = Vec::new();
312    let mut pk_filters = Vec::new();
313
314    for filter_ctx in simple_filters {
315        // Split tag predicates that can be evaluated against the encoded PK
316        // from filters that still need normal row-wise evaluation later.
317        let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| {
318            (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone())
319        });
320
321        if let Some(pk_filter) = pk_filter {
322            pk_filters.push(pk_filter);
323        } else {
324            remaining_simple_filters.push(filter_ctx);
325        }
326    }
327
328    BulkFilterPlan {
329        remaining_simple_filters,
330        pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)),
331    }
332}
333
334/// Splits a query [`Predicate`] into a [`ReaderFilterPlan`]: predicates that can run
335/// during the prefilter pass (on a reduced projection, to compute a refined row
336/// selection) versus predicates that must run on the normal read path (alongside the
337/// full projection).
338///
339/// The prefilter pass is *best-effort pruning*: a physical-filter predicate is silently
340/// dropped when [`PhysicalFilterContext::new_opt`] returns `None` (column not in the
341/// projected arrow schema). This is safe because the DataFusion `FilterExec` above the
342/// reader always re-applies the original predicate, so the prefilter pass is purely a
343/// pruning hint.
344///
345/// Tag and timestamp predicates that lower to [`SimpleFilterEvaluator`] are an
346/// exception — the engine enforces them precisely, so the prefilter pass is the only
347/// place they execute. They are never silently dropped.
348pub(crate) fn build_reader_filter_plan(
349    predicate: Option<&Predicate>,
350    expected_metadata: Option<&RegionMetadata>,
351    pre_filter_mode: PreFilterMode,
352    read_format: &FlatReadFormat,
353    codec: &Arc<dyn PrimaryKeyCodec>,
354) -> ReaderFilterPlan {
355    let Some(predicate) = predicate else {
356        return ReaderFilterPlan {
357            remaining_simple_filters: Vec::new(),
358            prefilter_builder: None,
359        };
360    };
361
362    let metadata = read_format.metadata();
363    let mut prefilter_simple_filters = Vec::new();
364    let mut remaining_simple_filters = Vec::new();
365    let mut prefilter_physical_filters = Vec::new();
366    let mut primary_key_filters = Vec::new();
367    let mut pk_filter_contexts = Vec::new();
368
369    // `SkipFields` keeps field predicates in the normal read path to avoid a
370    // second read of projected field columns, while tags/timestamp can still
371    // participate in prefiltering.
372    let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All;
373    // When true, tag columns are encoded in the primary key column and are NOT
374    // stored as separate parquet columns. Tag predicates must go through PK
375    // decoding rather than direct column reads.
376    let need_pk_prefilter = !read_format.batch_has_raw_pk_columns();
377
378    // Whether a column can be read directly from parquet for prefiltering,
379    // based on its semantic type and the current mode/format.
380    let can_direct_prefilter = |semantic_type: SemanticType| -> bool {
381        match semantic_type {
382            SemanticType::Tag => !need_pk_prefilter,
383            SemanticType::Field => field_prefilter_enabled,
384            SemanticType::Timestamp => true,
385        }
386    };
387
388    for expr in predicate.exprs() {
389        // Prefer cheap simple filters first. They also preserve `Matched` /
390        // `Pruned` states for columns that only exist in expected metadata.
391        if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) {
392            // `Matched` and `Pruned` come from expected-metadata compatibility
393            // and must stay in the main filter list so later phases keep that
394            // outcome.
395            let Some(filter) = filter_ctx.filter().as_filter() else {
396                remaining_simple_filters.push(filter_ctx);
397                continue;
398            };
399
400            // If the column is stored as a separate parquet column and is already projected in the main read,
401            // we can evaluate the simple filter directly during prefilter.
402            let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type());
403            if direct_prefilter {
404                assert!(
405                    read_format
406                        .arrow_schema()
407                        .column_with_name(filter.column_name())
408                        .is_some(),
409                    "Column '{}' is not present in the arrow schema {:?}",
410                    filter.column_name(),
411                    read_format.arrow_schema(),
412                );
413                prefilter_simple_filters.push(filter_ctx);
414                continue;
415            }
416
417            // Otherwise try to filter through encoded-PK matching.
418            if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag {
419                primary_key_filters.push(filter.clone());
420                pk_filter_contexts.push(filter_ctx);
421            } else {
422                remaining_simple_filters.push(filter_ctx);
423            }
424            continue;
425        }
426
427        // Best-effort physical-filter prefilter (see fn-level doc): `new_opt`
428        // returning `None` means the column is not in the projected arrow
429        // schema, and dropping the predicate is safe because the upper
430        // `FilterExec` re-applies it.
431        if let Some(filter) =
432            PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr)
433            && can_direct_prefilter(filter.semantic_type())
434        {
435            prefilter_physical_filters.push(filter);
436        }
437    }
438
439    let pk_filter_expr_strs = (!pk_filter_contexts.is_empty()).then(|| {
440        let mut expr_strs = pk_filter_contexts
441            .iter()
442            .map(|filter_ctx| filter_ctx.expr_str().to_string())
443            .collect::<Vec<_>>();
444        expr_strs.sort();
445        SmallVec::from_vec(expr_strs)
446    });
447    let pk_filter_exprs =
448        (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
449    let schema_version = expected_metadata
450        .map(|metadata| metadata.schema_version)
451        .unwrap_or_else(|| read_format.metadata().schema_version);
452    let prefilter_builder = PrefilterContextBuilder::new(
453        read_format,
454        codec,
455        pk_filter_exprs,
456        pk_filter_expr_strs,
457        prefilter_simple_filters.clone(),
458        prefilter_physical_filters,
459        schema_version,
460    );
461
462    if prefilter_builder.is_some() {
463        ReaderFilterPlan {
464            remaining_simple_filters,
465            prefilter_builder,
466        }
467    } else {
468        // If prefilter setup is not worthwhile, keep the original simple
469        // filters on the normal path so behavior is unchanged.
470        remaining_simple_filters.extend(prefilter_simple_filters);
471        remaining_simple_filters.extend(pk_filter_contexts);
472        ReaderFilterPlan {
473            remaining_simple_filters,
474            prefilter_builder: None,
475        }
476    }
477}
478
479/// Context for prefiltering a row group.
480pub(crate) struct PrefilterContext {
481    /// Optional PK filter for legacy primary-key-format parquet.
482    pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
483    /// Simple filters that can be evaluated directly from the prefilter batch.
484    filters: Vec<SimpleFilterContext>,
485    /// Physical filters that can be evaluated directly from the prefilter batch.
486    /// Physical expressions are only applied in the prefilter phase.
487    physical_filters: Vec<PhysicalFilterContext>,
488    /// Region schema version used in per-filter cache keys.
489    schema_version: u64,
490    /// Sorted expression strings for the encoded-PK filter group.
491    pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
492    /// Arrow schema used to build narrowed prefilter projections.
493    arrow_schema: SchemaRef,
494}
495
496/// Pre-built state for constructing [PrefilterContext] per row group.
497///
498/// Fields invariant across row groups (projection mask, codec, metadata, filters)
499/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
500/// is created via [PrefilterContextBuilder::build()] for each row group.
501pub(crate) struct PrefilterContextBuilder {
502    pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
503    pk_filter_expr_strs: Option<SmallVec<[String; 1]>>,
504    filters: Vec<SimpleFilterContext>,
505    physical_filters: Vec<PhysicalFilterContext>,
506    codec: Arc<dyn PrimaryKeyCodec>,
507    metadata: RegionMetadataRef,
508    schema_version: u64,
509    arrow_schema: SchemaRef,
510}
511
512impl PrefilterContextBuilder {
513    /// Creates a builder if prefiltering is applicable.
514    ///
515    /// Returns `None` if:
516    /// - The read format doesn't use flat layout
517    /// - No prefilter columns are selected
518    /// - Prefilter would read the full projection without any PK filter
519    pub(crate) fn new(
520        read_format: &FlatReadFormat,
521        codec: &Arc<dyn PrimaryKeyCodec>,
522        primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
523        primary_key_filter_expr_strs: Option<SmallVec<[String; 1]>>,
524        filters: Vec<SimpleFilterContext>,
525        physical_filters: Vec<PhysicalFilterContext>,
526        schema_version: u64,
527    ) -> Option<Self> {
528        let metadata = read_format.metadata();
529        let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
530        let pk_filters = (!use_raw_tag_columns)
531            .then_some(primary_key_filters)
532            .flatten()
533            .filter(|filters| !filters.is_empty());
534        let pk_filter_expr_strs = pk_filters
535            .is_some()
536            .then_some(primary_key_filter_expr_strs)
537            .flatten();
538
539        let mut prefilter_column_names = HashSet::new();
540        for filter_ctx in &filters {
541            if let MaybeFilter::Filter(filter) = filter_ctx.filter() {
542                prefilter_column_names.insert(filter.column_name().to_string());
543            }
544        }
545
546        if pk_filters.is_some() {
547            prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
548        }
549
550        for filter_ctx in &physical_filters {
551            prefilter_column_names.insert(filter_ctx.column_name().to_string());
552        }
553
554        let prefilter_count =
555            compute_projection_count(&prefilter_column_names, read_format.arrow_schema());
556
557        if prefilter_count == 0 {
558            return None;
559        }
560
561        let total_count = read_format.parquet_read_columns().root_indices().len();
562        let remaining_count = total_count.saturating_sub(prefilter_count);
563        if pk_filters.is_none() && prefilter_count >= total_count {
564            return None;
565        }
566
567        if pk_filters.is_none()
568            && !should_use_prefilter(prefilter_count, remaining_count, total_count)
569        {
570            return None;
571        }
572
573        Some(Self {
574            pk_filters,
575            pk_filter_expr_strs,
576            filters,
577            physical_filters,
578            codec: Arc::clone(codec),
579            metadata: metadata.clone(),
580            schema_version,
581            arrow_schema: read_format.arrow_schema().clone(),
582        })
583    }
584
585    /// Builds a [PrefilterContext] for a specific row group.
586    pub(crate) fn build(&self) -> PrefilterContext {
587        let pk_filter = self.pk_filters.as_ref().map(|pk_filters| {
588            let pk_filter = self
589                .codec
590                .primary_key_filter(&self.metadata, Arc::clone(pk_filters));
591            Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
592        });
593        PrefilterContext {
594            pk_filter,
595            filters: self.filters.clone(),
596            physical_filters: self.physical_filters.clone(),
597            schema_version: self.schema_version,
598            pk_filter_expr_strs: self.pk_filter_expr_strs.clone(),
599            arrow_schema: self.arrow_schema.clone(),
600        }
601    }
602}
603
604const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5;
605const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2;
606
607/// Result of prefiltering a row group.
608pub(crate) struct PrefilterResult {
609    /// Refined row selection after prefiltering.
610    pub(crate) refined_selection: RowSelection,
611    /// Number of rows filtered out by prefiltering.
612    pub(crate) filtered_rows: usize,
613}
614
615/// Executes prefiltering on a row group.
616///
617/// Reads only the prefilter columns (currently the PK dictionary column),
618/// applies filters, and returns a refined [RowSelection].
619fn compute_projection_mask(
620    column_names: &HashSet<String>,
621    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
622    parquet_schema: &SchemaDescriptor,
623) -> ProjectionMask {
624    ProjectionMask::roots(
625        parquet_schema,
626        projection_indices(column_names, arrow_schema),
627    )
628}
629
630fn compute_projection_count(
631    column_names: &HashSet<String>,
632    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
633) -> usize {
634    projection_indices(column_names, arrow_schema).len()
635}
636
637fn projection_indices(
638    column_names: &HashSet<String>,
639    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
640) -> Vec<usize> {
641    let mut projection_indices: Vec<usize> = column_names
642        .iter()
643        .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
644        .collect();
645    projection_indices.sort_unstable();
646    projection_indices.dedup();
647    projection_indices
648}
649
650fn should_use_prefilter(
651    prefilter_count: usize,
652    remaining_count: usize,
653    total_count: usize,
654) -> bool {
655    if remaining_count == 0 {
656        return false;
657    }
658
659    if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS {
660        return false;
661    }
662
663    let ratio = prefilter_count as f64 / total_count as f64;
664    ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD
665}
666
667pub(crate) async fn execute_prefilter(
668    prefilter_ctx: &mut PrefilterContext,
669    reader_builder: &RowGroupReaderBuilder,
670    build_ctx: &RowGroupBuildContext<'_>,
671) -> Result<PrefilterResult> {
672    let entries = build_prefilter_cache_entries(prefilter_ctx, reader_builder, build_ctx);
673
674    if entries.is_empty() {
675        return execute_prefilter_by_reading_columns(prefilter_ctx, reader_builder, build_ctx)
676            .await;
677    }
678
679    execute_prefilter_with_result_cache(prefilter_ctx, reader_builder, build_ctx, entries).await
680}
681
682async fn execute_prefilter_with_result_cache(
683    prefilter_ctx: &mut PrefilterContext,
684    reader_builder: &RowGroupReaderBuilder,
685    build_ctx: &RowGroupBuildContext<'_>,
686    entries: Vec<PrefilterEntry>,
687) -> Result<PrefilterResult> {
688    let non_cacheable_physical = non_cacheable_physical_filters(prefilter_ctx);
689    let mut hit_mask: Option<BooleanBuffer> = None;
690    let mut misses = Vec::new();
691    for entry in entries {
692        let Some(key) = &entry.key else {
693            misses.push(entry);
694            continue;
695        };
696
697        if let Some(mask) = reader_builder.cache_strategy().get_prefilter_result(key) {
698            hit_mask = Some(match hit_mask {
699                Some(hit_mask) => hit_mask.bitand(mask.as_ref()),
700                None => mask.as_ref().clone(),
701            });
702        } else {
703            misses.push(entry);
704        }
705    }
706
707    if misses.is_empty() && non_cacheable_physical.is_empty() {
708        let combined_mask = hit_mask.unwrap_or_else(|| BooleanBuffer::new_set(0));
709        let refined_selection =
710            refined_selection_from_mask(&combined_mask, &build_ctx.row_selection);
711        let rows_before_filter = rows_before_filter(reader_builder, build_ctx);
712        let filtered_rows = rows_before_filter.saturating_sub(refined_selection.row_count());
713        return Ok(PrefilterResult {
714            refined_selection,
715            filtered_rows,
716        });
717    }
718
719    let mut uncached_entries = misses;
720    uncached_entries.extend(
721        non_cacheable_physical
722            .iter()
723            .copied()
724            .map(|idx| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
725    );
726    let (uncached_mask, read_rows) =
727        build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &uncached_entries).await?;
728
729    let final_mask = match (hit_mask, uncached_mask) {
730        (Some(hit_mask), Some(uncached_mask)) => hit_mask.bitand(&uncached_mask),
731        (Some(hit_mask), None) => hit_mask,
732        (None, Some(uncached_mask)) => uncached_mask,
733        (None, None) => BooleanBuffer::new_set(read_rows),
734    };
735    debug_assert_eq!(final_mask.len(), read_rows);
736    let rows_selected = final_mask.count_set_bits();
737    let filtered_rows = read_rows.saturating_sub(rows_selected);
738    let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
739
740    Ok(PrefilterResult {
741        refined_selection,
742        filtered_rows,
743    })
744}
745
746fn non_cacheable_physical_filters(prefilter_ctx: &PrefilterContext) -> Vec<usize> {
747    prefilter_ctx
748        .physical_filters
749        .iter()
750        .enumerate()
751        .filter_map(|(idx, filter)| (!filter.is_immutable()).then_some(idx))
752        .collect()
753}
754
755async fn build_prefilter_masks(
756    prefilter_ctx: &mut PrefilterContext,
757    reader_builder: &RowGroupReaderBuilder,
758    build_ctx: &RowGroupBuildContext<'_>,
759    entries: &[PrefilterEntry],
760) -> Result<(Option<BooleanBuffer>, usize)> {
761    let prefilter_column_names = prefilter_column_names_for_entries(prefilter_ctx, entries);
762    let parquet_schema = reader_builder
763        .parquet_metadata()
764        .file_metadata()
765        .schema_descr();
766    let projection = compute_projection_mask(
767        &prefilter_column_names,
768        &prefilter_ctx.arrow_schema,
769        parquet_schema,
770    );
771
772    let mut stream = reader_builder
773        .build_with_projection(
774            build_ctx.row_group_idx,
775            build_ctx.row_selection.clone(),
776            projection,
777            build_ctx.fetch_metrics,
778        )
779        .await?;
780
781    let mut cache_builders = entries
782        .iter()
783        .map(|entry| entry.key.is_some().then(|| BooleanBufferBuilder::new(0)))
784        .collect::<Vec<_>>();
785    let mut combined_builder = (!entries.is_empty()).then(|| BooleanBufferBuilder::new(0));
786    let mut rows_before_filter = 0usize;
787
788    while let Some(batch_result) = stream.next().await {
789        let batch = batch_result?;
790        let num_rows = batch.num_rows();
791        if num_rows == 0 {
792            continue;
793        }
794        rows_before_filter += num_rows;
795
796        let mut batch_mask = BooleanBuffer::new_set(num_rows);
797        for (idx, entry) in entries.iter().enumerate() {
798            let mask = eval_entry_mask(
799                &batch,
800                prefilter_ctx,
801                entry.kind,
802                reader_builder.file_path(),
803            )?;
804            batch_mask = batch_mask.bitand(&mask);
805            if let Some(Some(builder)) = cache_builders.get_mut(idx) {
806                builder.append_buffer(&mask);
807            }
808        }
809        if let Some(builder) = &mut combined_builder {
810            builder.append_buffer(&batch_mask);
811        }
812    }
813
814    for (entry, builder) in entries.iter().zip(cache_builders) {
815        if let (Some(key), Some(mut builder)) = (&entry.key, builder) {
816            reader_builder
817                .cache_strategy()
818                .put_prefilter_result(key.clone(), Arc::new(builder.finish()));
819        }
820    }
821
822    Ok((
823        combined_builder.map(|mut builder| builder.finish()),
824        rows_before_filter,
825    ))
826}
827
828fn prefilter_column_names_for_entries(
829    prefilter_ctx: &PrefilterContext,
830    entries: &[PrefilterEntry],
831) -> HashSet<String> {
832    let mut prefilter_column_names = HashSet::new();
833    for entry in entries {
834        match entry.kind {
835            PrefilterEntryKind::Simple(idx) => {
836                if let MaybeFilter::Filter(filter) = prefilter_ctx.filters[idx].filter() {
837                    prefilter_column_names.insert(filter.column_name().to_string());
838                }
839            }
840            PrefilterEntryKind::Physical(idx) => {
841                prefilter_column_names.insert(
842                    prefilter_ctx.physical_filters[idx]
843                        .column_name()
844                        .to_string(),
845                );
846            }
847            PrefilterEntryKind::PkGroup => {
848                prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
849            }
850        }
851    }
852    prefilter_column_names
853}
854
855async fn execute_prefilter_by_reading_columns(
856    prefilter_ctx: &mut PrefilterContext,
857    reader_builder: &RowGroupReaderBuilder,
858    build_ctx: &RowGroupBuildContext<'_>,
859) -> Result<PrefilterResult> {
860    let entries = all_prefilter_entries(prefilter_ctx);
861    let (mask, rows_before_filter) =
862        build_prefilter_masks(prefilter_ctx, reader_builder, build_ctx, &entries).await?;
863
864    let final_mask = mask.unwrap_or_else(|| BooleanBuffer::new_set(rows_before_filter));
865    let rows_selected = final_mask.count_set_bits();
866    let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
867    let refined_selection = refined_selection_from_mask(&final_mask, &build_ctx.row_selection);
868
869    Ok(PrefilterResult {
870        refined_selection,
871        filtered_rows,
872    })
873}
874
875fn all_prefilter_entries(prefilter_ctx: &PrefilterContext) -> Vec<PrefilterEntry> {
876    let mut entries = Vec::new();
877    if prefilter_ctx.pk_filter.is_some() {
878        entries.push(PrefilterEntry::without_cache(PrefilterEntryKind::PkGroup));
879    }
880    entries.extend(
881        prefilter_ctx
882            .filters
883            .iter()
884            .enumerate()
885            .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Simple(idx))),
886    );
887    entries.extend(
888        prefilter_ctx
889            .physical_filters
890            .iter()
891            .enumerate()
892            .map(|(idx, _)| PrefilterEntry::without_cache(PrefilterEntryKind::Physical(idx))),
893    );
894    entries
895}
896
897#[derive(Clone, Copy)]
898enum PrefilterEntryKind {
899    Simple(usize),
900    Physical(usize),
901    PkGroup,
902}
903
904struct PrefilterEntry {
905    kind: PrefilterEntryKind,
906    key: Option<PrefilterKey>,
907}
908
909impl PrefilterEntry {
910    fn without_cache(kind: PrefilterEntryKind) -> Self {
911        Self { kind, key: None }
912    }
913}
914
915fn build_prefilter_cache_entries(
916    prefilter_ctx: &PrefilterContext,
917    reader_builder: &RowGroupReaderBuilder,
918    build_ctx: &RowGroupBuildContext<'_>,
919) -> Vec<PrefilterEntry> {
920    let row_selection = PrefilterKey::row_selection_snapshot(build_ctx.row_selection.as_ref());
921    let file_id = reader_builder.file_handle().file_id().file_id();
922    let row_group_idx = build_ctx.row_group_idx as u32;
923    let mut entries = Vec::new();
924
925    for (idx, filter_ctx) in prefilter_ctx.filters.iter().enumerate() {
926        entries.push(PrefilterEntry {
927            kind: PrefilterEntryKind::Simple(idx),
928            key: Some(PrefilterKey::new(
929                file_id,
930                row_group_idx,
931                row_selection.clone(),
932                prefilter_ctx.schema_version,
933                smallvec![filter_ctx.expr_str().to_string()],
934            )),
935        });
936    }
937
938    for (idx, filter_ctx) in prefilter_ctx.physical_filters.iter().enumerate() {
939        if !filter_ctx.is_immutable() {
940            continue;
941        }
942        entries.push(PrefilterEntry {
943            kind: PrefilterEntryKind::Physical(idx),
944            key: Some(PrefilterKey::new(
945                file_id,
946                row_group_idx,
947                row_selection.clone(),
948                prefilter_ctx.schema_version,
949                smallvec![filter_ctx.expr_str().to_string()],
950            )),
951        });
952    }
953
954    if prefilter_ctx.pk_filter.is_some()
955        && let Some(exprs) = &prefilter_ctx.pk_filter_expr_strs
956    {
957        entries.push(PrefilterEntry {
958            kind: PrefilterEntryKind::PkGroup,
959            key: Some(PrefilterKey::new(
960                file_id,
961                row_group_idx,
962                row_selection,
963                prefilter_ctx.schema_version,
964                exprs.clone(),
965            )),
966        });
967    }
968
969    entries
970}
971
972fn rows_before_filter(
973    reader_builder: &RowGroupReaderBuilder,
974    build_ctx: &RowGroupBuildContext<'_>,
975) -> usize {
976    build_ctx.row_selection.as_ref().map_or_else(
977        || {
978            reader_builder
979                .parquet_metadata()
980                .row_group(build_ctx.row_group_idx)
981                .num_rows() as usize
982        },
983        RowSelection::row_count,
984    )
985}
986
987fn refined_selection_from_mask(
988    mask: &BooleanBuffer,
989    original_selection: &Option<RowSelection>,
990) -> RowSelection {
991    if mask.is_empty() || mask.count_set_bits() == 0 {
992        return RowSelection::from(vec![]);
993    }
994
995    let prefilter_selection = RowSelection::from_filters(&[BooleanArray::from(mask.clone())]);
996    match original_selection {
997        Some(original) => original.and_then(&prefilter_selection),
998        None => prefilter_selection,
999    }
1000}
1001
1002fn eval_entry_mask(
1003    batch: &RecordBatch,
1004    prefilter_ctx: &mut PrefilterContext,
1005    kind: PrefilterEntryKind,
1006    file_path: &str,
1007) -> Result<BooleanBuffer> {
1008    match kind {
1009        PrefilterEntryKind::Simple(idx) => {
1010            eval_simple_filter_mask(batch, &prefilter_ctx.filters[idx], file_path)
1011        }
1012        PrefilterEntryKind::Physical(idx) => {
1013            eval_physical_filter_mask(batch, &prefilter_ctx.physical_filters[idx], file_path)
1014        }
1015        PrefilterEntryKind::PkGroup => {
1016            let pk_filter = prefilter_ctx.pk_filter.as_mut().context(UnexpectedSnafu {
1017                reason: "Missing primary key filter for prefilter cache entry",
1018            })?;
1019            eval_pk_group_mask(batch, pk_filter.as_mut())
1020        }
1021    }
1022}
1023
1024fn eval_pk_group_mask(
1025    batch: &RecordBatch,
1026    pk_filter: &mut dyn PrimaryKeyFilter,
1027) -> Result<BooleanBuffer> {
1028    let (pk_column_index, _) = batch
1029        .schema()
1030        .column_with_name(PRIMARY_KEY_COLUMN_NAME)
1031        .context(UnexpectedSnafu {
1032            reason: "Primary key column not found in prefilter batch",
1033        })?;
1034    let matched_row_ranges = matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter)?;
1035    let mut builder = BooleanBufferBuilder::new(batch.num_rows());
1036    builder.append_n(batch.num_rows(), false);
1037    for range in matched_row_ranges {
1038        for row in range {
1039            builder.set_bit(row, true);
1040        }
1041    }
1042    Ok(builder.finish())
1043}
1044
1045fn eval_simple_filter_mask(
1046    batch: &RecordBatch,
1047    filter_ctx: &SimpleFilterContext,
1048    file_path: &str,
1049) -> Result<BooleanBuffer> {
1050    let filter = match filter_ctx.filter() {
1051        MaybeFilter::Filter(filter) => filter,
1052        MaybeFilter::Matched => return Ok(BooleanBuffer::new_set(batch.num_rows())),
1053        MaybeFilter::Pruned => return Ok(BooleanBuffer::new_unset(batch.num_rows())),
1054    };
1055
1056    let (idx, _) = batch
1057        .schema()
1058        .column_with_name(filter.column_name())
1059        .with_context(|| UnexpectedSnafu {
1060            reason: format!(
1061                "Prefilter column '{}' (id {}) not found in batch for file {}",
1062                filter.column_name(),
1063                filter_ctx.column_id(),
1064                file_path
1065            ),
1066        })?;
1067    let column = batch.column(idx).clone();
1068    filter.evaluate_array(&column).context(RecordBatchSnafu)
1069}
1070
1071fn eval_physical_filter_mask(
1072    batch: &RecordBatch,
1073    filter_ctx: &PhysicalFilterContext,
1074    file_path: &str,
1075) -> Result<BooleanBuffer> {
1076    let filter = filter_ctx.filter();
1077
1078    let (idx, _) = batch
1079        .schema()
1080        .column_with_name(filter_ctx.column_name())
1081        .with_context(|| UnexpectedSnafu {
1082            reason: format!(
1083                "Prefilter physical column '{}' (id {}) not found in batch for file {}",
1084                filter_ctx.column_name(),
1085                filter_ctx.column_id(),
1086                file_path
1087            ),
1088        })?;
1089    let column = batch.column(idx).clone();
1090
1091    let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
1092        .context(NewRecordBatchSnafu)?;
1093    let evaluated = filter
1094        .evaluate(&record_batch)
1095        .context(EvalPartitionFilterSnafu)?;
1096    let array = evaluated
1097        .into_array(record_batch.num_rows())
1098        .context(EvalPartitionFilterSnafu)?;
1099    let boolean_array = array
1100        .as_any()
1101        .downcast_ref::<BooleanArray>()
1102        .context(UnexpectedSnafu {
1103            reason: "Failed to downcast physical filter result to BooleanArray",
1104        })?;
1105    // Treat null results as false (filtered out); value bits are not guaranteed
1106    // to be false for invalid entries.
1107    let mut result = boolean_array.values().clone();
1108    if let Some(nulls) = boolean_array.nulls() {
1109        result = result.bitand(nulls.inner());
1110    }
1111    Ok(result)
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116    use std::sync::Arc;
1117    use std::sync::atomic::{AtomicUsize, Ordering};
1118
1119    use common_recordbatch::filter::SimpleFilterEvaluator;
1120    use datafusion_expr::{col, lit};
1121    use datatypes::arrow::array::{
1122        ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
1123    };
1124    use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
1125    use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
1126    use store_api::codec::PrimaryKeyEncoding;
1127
1128    use super::*;
1129    use crate::read::read_columns::ReadColumns;
1130    use crate::sst::internal_fields;
1131    use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
1132    use crate::test_util::sst_util::{
1133        new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata,
1134        sst_region_metadata_with_encoding,
1135    };
1136
1137    struct CountingPrimaryKeyFilter {
1138        hits: Arc<AtomicUsize>,
1139        expected: Vec<u8>,
1140    }
1141
1142    impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
1143        fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
1144            self.hits.fetch_add(1, Ordering::Relaxed);
1145            Ok(pk == self.expected.as_slice())
1146        }
1147    }
1148
1149    #[test]
1150    fn test_cached_primary_key_filter_reuses_previous_result() {
1151        let expected = new_primary_key(&["a", "x"]);
1152        let hits = Arc::new(AtomicUsize::new(0));
1153        let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
1154            hits: Arc::clone(&hits),
1155            expected: expected.clone(),
1156        }));
1157
1158        assert!(filter.matches(expected.as_slice()).unwrap());
1159        assert!(filter.matches(expected.as_slice()).unwrap());
1160        assert!(
1161            !filter
1162                .matches(new_primary_key(&["b", "x"]).as_slice())
1163                .unwrap()
1164        );
1165
1166        assert_eq!(hits.load(Ordering::Relaxed), 2);
1167    }
1168
1169    fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
1170        exprs
1171            .iter()
1172            .filter_map(SimpleFilterEvaluator::try_new)
1173            .collect()
1174    }
1175
1176    fn new_simple_filter_contexts(
1177        metadata: &RegionMetadataRef,
1178        exprs: &[datafusion_expr::Expr],
1179    ) -> Vec<SimpleFilterContext> {
1180        exprs
1181            .iter()
1182            .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
1183            .collect()
1184    }
1185
1186    fn new_physical_filter_contexts(
1187        metadata: &RegionMetadataRef,
1188        read_format: &FlatReadFormat,
1189        exprs: &[datafusion_expr::Expr],
1190    ) -> Vec<PhysicalFilterContext> {
1191        exprs
1192            .iter()
1193            .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr))
1194            .collect()
1195    }
1196
1197    fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1198        assert_eq!(primary_keys.len(), field_values.len());
1199
1200        let metadata = Arc::new(sst_region_metadata());
1201        let arrow_schema = metadata.schema.arrow_schema();
1202        let field_column = arrow_schema
1203            .field(arrow_schema.index_of("field_0").unwrap())
1204            .clone();
1205        let time_index_column = arrow_schema
1206            .field(arrow_schema.index_of("ts").unwrap())
1207            .clone();
1208        let mut fields = vec![field_column, time_index_column];
1209        fields.extend(
1210            internal_fields()
1211                .into_iter()
1212                .map(|field| field.as_ref().clone()),
1213        );
1214        let schema = Arc::new(Schema::new(fields));
1215
1216        let mut dict_values = Vec::new();
1217        let mut keys = Vec::with_capacity(primary_keys.len());
1218        for pk in primary_keys {
1219            let key = dict_values
1220                .iter()
1221                .position(|existing: &&[u8]| existing == pk)
1222                .unwrap_or_else(|| {
1223                    dict_values.push(*pk);
1224                    dict_values.len() - 1
1225                });
1226            keys.push(key as u32);
1227        }
1228        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1229            UInt32Array::from(keys),
1230            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1231        ));
1232
1233        RecordBatch::try_new(
1234            schema,
1235            vec![
1236                Arc::new(UInt64Array::from(field_values.to_vec())),
1237                Arc::new(TimestampMillisecondArray::from_iter_values(
1238                    0..primary_keys.len() as i64,
1239                )),
1240                pk_array,
1241                Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
1242                Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
1243            ],
1244        )
1245        .unwrap()
1246    }
1247
1248    fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1249        assert_eq!(primary_keys.len(), field_values.len());
1250
1251        let metadata = Arc::new(sst_region_metadata());
1252        let arrow_schema = metadata.schema.arrow_schema();
1253        let field_column = arrow_schema
1254            .field(arrow_schema.index_of("field_0").unwrap())
1255            .clone();
1256        let time_index_column = arrow_schema
1257            .field(arrow_schema.index_of("ts").unwrap())
1258            .clone();
1259        let schema = Arc::new(Schema::new(vec![
1260            field_column,
1261            time_index_column,
1262            internal_fields()[0].as_ref().clone(),
1263        ]));
1264
1265        let mut dict_values = Vec::new();
1266        let mut keys = Vec::with_capacity(primary_keys.len());
1267        for pk in primary_keys {
1268            let key = dict_values
1269                .iter()
1270                .position(|existing: &&[u8]| existing == pk)
1271                .unwrap_or_else(|| {
1272                    dict_values.push(*pk);
1273                    dict_values.len() - 1
1274                });
1275            keys.push(key as u32);
1276        }
1277        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
1278            UInt32Array::from(keys),
1279            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
1280        ));
1281
1282        RecordBatch::try_new(
1283            schema,
1284            vec![
1285                Arc::new(UInt64Array::from(field_values.to_vec())),
1286                Arc::new(TimestampMillisecondArray::from_iter_values(
1287                    0..primary_keys.len() as i64,
1288                )),
1289                pk_array,
1290            ],
1291        )
1292        .unwrap()
1293    }
1294
1295    fn new_prefilter_batch_binary_pk(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
1296        assert_eq!(primary_keys.len(), field_values.len());
1297
1298        let metadata = Arc::new(sst_region_metadata());
1299        let arrow_schema = metadata.schema.arrow_schema();
1300        let field_column = arrow_schema
1301            .field(arrow_schema.index_of("field_0").unwrap())
1302            .clone();
1303        let time_index_column = arrow_schema
1304            .field(arrow_schema.index_of("ts").unwrap())
1305            .clone();
1306        let schema = Arc::new(Schema::new(vec![
1307            field_column,
1308            time_index_column,
1309            Field::new(PRIMARY_KEY_COLUMN_NAME, DataType::Binary, false),
1310        ]));
1311
1312        let pk_array: ArrayRef =
1313            Arc::new(BinaryArray::from_iter_values(primary_keys.iter().copied()));
1314
1315        RecordBatch::try_new(
1316            schema,
1317            vec![
1318                Arc::new(UInt64Array::from(field_values.to_vec())),
1319                Arc::new(TimestampMillisecondArray::from_iter_values(
1320                    0..primary_keys.len() as i64,
1321                )),
1322                pk_array,
1323            ],
1324        )
1325        .unwrap()
1326    }
1327
1328    fn field_values(batch: &RecordBatch) -> Vec<u64> {
1329        batch
1330            .column(0)
1331            .as_any()
1332            .downcast_ref::<UInt64Array>()
1333            .unwrap()
1334            .values()
1335            .to_vec()
1336    }
1337
1338    fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> {
1339        filters
1340            .iter()
1341            .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name())
1342            .collect()
1343    }
1344
1345    #[test]
1346    fn test_prefilter_primary_key_drops_single_dictionary_batch() {
1347        let metadata = Arc::new(sst_region_metadata());
1348        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
1349        let mut primary_key_filter =
1350            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1351        let pk_a = new_primary_key(&["a", "x"]);
1352        let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
1353        let pk_col_idx = primary_key_column_index(batch.num_columns());
1354
1355        let filtered =
1356            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1357                .unwrap();
1358
1359        assert!(filtered.is_none());
1360    }
1361
1362    #[test]
1363    fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
1364        let metadata = Arc::new(sst_region_metadata());
1365        let filters = Arc::new(new_test_filters(&[col("tag_0")
1366            .eq(lit("a"))
1367            .or(col("tag_0").eq(lit("c")))]));
1368        let mut primary_key_filter =
1369            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
1370        let pk_a = new_primary_key(&["a", "x"]);
1371        let pk_b = new_primary_key(&["b", "x"]);
1372        let pk_c = new_primary_key(&["c", "x"]);
1373        let pk_d = new_primary_key(&["d", "x"]);
1374        let batch = new_raw_batch(
1375            &[
1376                pk_a.as_slice(),
1377                pk_a.as_slice(),
1378                pk_b.as_slice(),
1379                pk_b.as_slice(),
1380                pk_c.as_slice(),
1381                pk_c.as_slice(),
1382                pk_d.as_slice(),
1383                pk_d.as_slice(),
1384            ],
1385            &[10, 11, 12, 13, 14, 15, 16, 17],
1386        );
1387        let pk_col_idx = primary_key_column_index(batch.num_columns());
1388
1389        let filtered =
1390            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
1391                .unwrap()
1392                .unwrap();
1393
1394        assert_eq!(filtered.num_rows(), 4);
1395        assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
1396    }
1397
1398    #[test]
1399    fn test_prefilter_builder_returns_none_without_selected_filters() {
1400        let metadata: RegionMetadataRef =
1401            Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1402        let read_format = FlatReadFormat::new(
1403            metadata.clone(),
1404            ReadColumns::from_deduped_column_ids(
1405                metadata.column_metadatas.iter().map(|c| c.column_id),
1406            ),
1407            None,
1408            "test",
1409            false,
1410        )
1411        .unwrap();
1412        let codec = build_primary_key_codec(metadata.as_ref());
1413
1414        let builder = PrefilterContextBuilder::new(
1415            &read_format,
1416            &codec,
1417            None,
1418            None,
1419            Vec::new(),
1420            Vec::new(),
1421            metadata.schema_version,
1422        );
1423        assert!(builder.is_none());
1424    }
1425
1426    #[test]
1427    fn test_should_use_prefilter() {
1428        assert!(should_use_prefilter(1, 5, 6));
1429        assert!(!should_use_prefilter(1, 0, 1));
1430        assert!(!should_use_prefilter(1, 1, 2));
1431        assert!(!should_use_prefilter(4, 3, 7));
1432        assert!(should_use_prefilter(3, 3, 6));
1433    }
1434
1435    #[test]
1436    fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() {
1437        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1438            PrimaryKeyEncoding::Sparse,
1439        ));
1440        let legacy_read_format = FlatReadFormat::new(
1441            metadata.clone(),
1442            ReadColumns::from_deduped_column_ids(
1443                metadata.column_metadatas.iter().map(|c| c.column_id),
1444            ),
1445            None,
1446            "memtable",
1447            false,
1448        )
1449        .unwrap();
1450        assert!(!legacy_read_format.batch_has_raw_pk_columns());
1451
1452        let plan = build_bulk_filter_plan(
1453            &legacy_read_format,
1454            Some(&Predicate::new(vec![
1455                col("tag_0").eq(lit("a")),
1456                col("field_0").gt(lit(1_u64)),
1457            ])),
1458        );
1459        assert_eq!(
1460            plan.pk_filters.as_ref().map(|filters| filters.len()),
1461            Some(1)
1462        );
1463        assert_eq!(
1464            remaining_simple_filter_columns(&plan.remaining_simple_filters),
1465            vec!["field_0"]
1466        );
1467
1468        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1469        let raw_pk_read_format = FlatReadFormat::new(
1470            metadata.clone(),
1471            ReadColumns::from_deduped_column_ids(
1472                metadata.column_metadatas.iter().map(|c| c.column_id),
1473            ),
1474            None,
1475            "memtable",
1476            true,
1477        )
1478        .unwrap();
1479        assert!(raw_pk_read_format.batch_has_raw_pk_columns());
1480
1481        let tag_only_plan = build_bulk_filter_plan(
1482            &raw_pk_read_format,
1483            Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1484        );
1485        assert!(tag_only_plan.pk_filters.is_none());
1486        assert_eq!(
1487            remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters),
1488            vec!["tag_0"]
1489        );
1490
1491        let field_only_plan = build_bulk_filter_plan(
1492            &raw_pk_read_format,
1493            Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])),
1494        );
1495        assert!(field_only_plan.pk_filters.is_none());
1496        assert_eq!(
1497            remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters),
1498            vec!["field_0"]
1499        );
1500    }
1501
1502    #[test]
1503    fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() {
1504        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1505        let full_read_format = FlatReadFormat::new(
1506            metadata.clone(),
1507            ReadColumns::from_deduped_column_ids(
1508                metadata.column_metadatas.iter().map(|c| c.column_id),
1509            ),
1510            None,
1511            "test",
1512            true,
1513        )
1514        .unwrap();
1515        let codec = build_primary_key_codec(metadata.as_ref());
1516
1517        let skip_fields_plan = build_reader_filter_plan(
1518            Some(&Predicate::new(vec![
1519                col("tag_0").eq(lit("a")),
1520                col("field_0").gt(lit(1_u64)),
1521            ])),
1522            None,
1523            PreFilterMode::SkipFields,
1524            &full_read_format,
1525            &codec,
1526        );
1527        assert!(skip_fields_plan.prefilter_builder.is_some());
1528        assert_eq!(
1529            remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters),
1530            vec!["field_0"]
1531        );
1532
1533        let field_0 = metadata.column_by_name("field_0").unwrap().column_id;
1534        let ts = metadata.time_index_column().column_id;
1535        let projected_read_format = FlatReadFormat::new(
1536            metadata.clone(),
1537            ReadColumns::from_deduped_column_ids([field_0, ts]),
1538            None,
1539            "test",
1540            true,
1541        )
1542        .unwrap();
1543        let pk_prefilter_plan = build_reader_filter_plan(
1544            Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1545            None,
1546            PreFilterMode::All,
1547            &projected_read_format,
1548            &codec,
1549        );
1550        assert!(pk_prefilter_plan.prefilter_builder.is_some());
1551        assert!(pk_prefilter_plan.remaining_simple_filters.is_empty());
1552    }
1553
1554    #[test]
1555    fn test_pk_filter_expr_strings_are_stable_under_expr_order() {
1556        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1557            PrimaryKeyEncoding::Sparse,
1558        ));
1559        let read_format = FlatReadFormat::new(
1560            metadata.clone(),
1561            ReadColumns::from_deduped_column_ids(
1562                metadata.column_metadatas.iter().map(|c| c.column_id),
1563            ),
1564            None,
1565            "test",
1566            false,
1567        )
1568        .unwrap();
1569        let codec = build_primary_key_codec(metadata.as_ref());
1570
1571        let expr_a = col("tag_0").eq(lit("a"));
1572        let expr_b = col("tag_1").eq(lit("x"));
1573        let plan_ab = build_reader_filter_plan(
1574            Some(&Predicate::new(vec![expr_a.clone(), expr_b.clone()])),
1575            None,
1576            PreFilterMode::All,
1577            &read_format,
1578            &codec,
1579        );
1580        let plan_b_a = build_reader_filter_plan(
1581            Some(&Predicate::new(vec![expr_b, expr_a])),
1582            None,
1583            PreFilterMode::All,
1584            &read_format,
1585            &codec,
1586        );
1587
1588        let exprs_ab = plan_ab.prefilter_builder.unwrap().pk_filter_expr_strs;
1589        let exprs_b_a = plan_b_a.prefilter_builder.unwrap().pk_filter_expr_strs;
1590        assert!(exprs_ab.is_some());
1591        assert_eq!(exprs_ab, exprs_b_a);
1592    }
1593
1594    #[test]
1595    fn test_simple_and_physical_contexts_preserve_expr_strings() {
1596        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1597        let read_format = FlatReadFormat::new(
1598            metadata.clone(),
1599            ReadColumns::from_deduped_column_ids(
1600                metadata.column_metadatas.iter().map(|c| c.column_id),
1601            ),
1602            None,
1603            "test",
1604            true,
1605        )
1606        .unwrap();
1607
1608        let simple_expr = col("tag_0").eq(lit("a"));
1609        let simple = SimpleFilterContext::new_opt(&metadata, None, &simple_expr).unwrap();
1610        assert_eq!(simple.expr_str(), format!("{simple_expr:?}"));
1611
1612        let physical_expr = col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false);
1613        let physical =
1614            PhysicalFilterContext::new_opt(&metadata, None, &read_format, &physical_expr).unwrap();
1615        assert_eq!(physical.expr_str(), format!("{physical_expr:?}"));
1616    }
1617
1618    #[test]
1619    fn test_eval_simple_filter_mask_uses_flat_tag_columns_directly() {
1620        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1621        let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1622        let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
1623
1624        let mask = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap();
1625        assert_eq!(mask.count_set_bits(), 4);
1626    }
1627
1628    #[test]
1629    fn test_eval_simple_filter_mask_errors_on_missing_selected_column() {
1630        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1631        let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1632        let pk = new_primary_key(&["a", "x"]);
1633        let batch = new_raw_batch(&[pk.as_slice()], &[10]);
1634
1635        let err = eval_simple_filter_mask(&batch, &filters[0], "test").unwrap_err();
1636        let err = err.to_string();
1637        assert!(err.contains("Prefilter column"));
1638        assert!(err.contains("tag_0"));
1639    }
1640
1641    #[test]
1642    fn test_eval_physical_filter_mask_evaluates_physical_filters() {
1643        let metadata: RegionMetadataRef =
1644            Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1645        let read_format = FlatReadFormat::new(
1646            metadata.clone(),
1647            ReadColumns::from_deduped_column_ids(
1648                metadata.column_metadatas.iter().map(|c| c.column_id),
1649            ),
1650            None,
1651            "test",
1652            false,
1653        )
1654        .unwrap();
1655        let expr = col("field_0").in_list(vec![lit(11_u64)], false);
1656        let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]);
1657        let pk = new_primary_key(&["a", "x"]);
1658        let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
1659
1660        let mask = eval_physical_filter_mask(&batch, &physical_filters[0], "test").unwrap();
1661        assert_eq!(mask.count_set_bits(), 1);
1662    }
1663
1664    #[test]
1665    fn test_eval_pk_group_mask_finds_pk_column_by_name() {
1666        let metadata = Arc::new(sst_region_metadata());
1667        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1668        let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1669            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters),
1670        )) as Box<dyn PrimaryKeyFilter>);
1671        let pk_a = new_primary_key(&["a", "x"]);
1672        let pk_b = new_primary_key(&["b", "x"]);
1673        let batch = new_prefilter_batch(
1674            &[
1675                pk_a.as_slice(),
1676                pk_a.as_slice(),
1677                pk_b.as_slice(),
1678                pk_b.as_slice(),
1679            ],
1680            &[10, 11, 12, 13],
1681        );
1682
1683        let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
1684
1685        assert_eq!(mask.count_set_bits(), 2);
1686    }
1687
1688    #[test]
1689    fn test_eval_pk_group_mask_handles_binary_pk_column() {
1690        let metadata = Arc::new(sst_region_metadata());
1691        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1692        let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1693            build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters),
1694        )) as Box<dyn PrimaryKeyFilter>);
1695        let pk_a = new_primary_key(&["a", "x"]);
1696        let pk_b = new_primary_key(&["b", "x"]);
1697        let batch = new_prefilter_batch_binary_pk(
1698            &[
1699                pk_a.as_slice(),
1700                pk_a.as_slice(),
1701                pk_b.as_slice(),
1702                pk_b.as_slice(),
1703            ],
1704            &[10, 11, 12, 13],
1705        );
1706
1707        let mask = eval_pk_group_mask(&batch, pk_filter.as_mut().unwrap().as_mut()).unwrap();
1708
1709        assert_eq!(mask.count_set_bits(), 2);
1710    }
1711}