Skip to main content

mito2/sst/parquet/
prefilter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prefilter framework for parquet reader.
16//!
17//! Prefilter optimization reduces I/O by reading only a subset of columns first
18//! (the prefilter phase), applying filters to compute a refined row selection,
19//! then reading the remaining columns with the refined selection.
20
21use std::collections::HashSet;
22use std::ops::{BitAnd, Range};
23use std::sync::Arc;
24
25use api::v1::SemanticType;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use datatypes::arrow::array::{Array, BinaryArray, BooleanArray, BooleanBufferBuilder};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::record_batch::RecordBatch;
30use futures::StreamExt;
31use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
32use parquet::arrow::ProjectionMask;
33use parquet::arrow::arrow_reader::RowSelection;
34use parquet::schema::types::SchemaDescriptor;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
38use table::predicate::Predicate;
39
40use crate::error::{
41    ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
42    ReadParquetSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
43};
44use crate::sst::parquet::file_range::PreFilterMode;
45use crate::sst::parquet::flat_format::FlatReadFormat;
46use crate::sst::parquet::format::PrimaryKeyArray;
47use crate::sst::parquet::reader::{
48    MaybeFilter, PhysicalFilterContext, RowGroupBuildContext, RowGroupReaderBuilder,
49    SimpleFilterContext,
50};
51
52pub(crate) fn matching_row_ranges_by_primary_key(
53    input: &RecordBatch,
54    pk_column_index: usize,
55    pk_filter: &mut dyn PrimaryKeyFilter,
56) -> Result<Vec<Range<usize>>> {
57    let pk_dict_array = input
58        .column(pk_column_index)
59        .as_any()
60        .downcast_ref::<PrimaryKeyArray>()
61        .context(UnexpectedSnafu {
62            reason: "Primary key column is not a dictionary array",
63        })?;
64    let pk_values = pk_dict_array
65        .values()
66        .as_any()
67        .downcast_ref::<BinaryArray>()
68        .context(UnexpectedSnafu {
69            reason: "Primary key values are not binary array",
70        })?;
71    let keys = pk_dict_array.keys();
72    let key_values = keys.values();
73
74    if key_values.is_empty() {
75        return Ok(std::iter::once(0..input.num_rows()).collect());
76    }
77
78    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
79    let mut start = 0;
80    while start < key_values.len() {
81        let key = key_values[start];
82        let mut end = start + 1;
83        while end < key_values.len() && key_values[end] == key {
84            end += 1;
85        }
86
87        if pk_filter
88            .matches(pk_values.value(key as usize))
89            .context(DecodeSnafu)?
90        {
91            if let Some(last) = matched_row_ranges.last_mut()
92                && last.end == start
93            {
94                last.end = end;
95            } else {
96                matched_row_ranges.push(start..end);
97            }
98        }
99
100        start = end;
101    }
102
103    Ok(matched_row_ranges)
104}
105
106/// Filters a flat-format record batch by primary key, returning only rows whose
107/// primary key matches the filter. Returns `None` if all rows are filtered out.
108pub(crate) fn prefilter_flat_batch_by_primary_key(
109    input: RecordBatch,
110    pk_column_index: usize,
111    pk_filter: &mut dyn PrimaryKeyFilter,
112) -> Result<Option<RecordBatch>> {
113    if input.num_rows() == 0 {
114        return Ok(Some(input));
115    }
116
117    let matched_row_ranges =
118        matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
119    if matched_row_ranges.is_empty() {
120        return Ok(None);
121    }
122
123    if matched_row_ranges.len() == 1
124        && matched_row_ranges[0].start == 0
125        && matched_row_ranges[0].end == input.num_rows()
126    {
127        return Ok(Some(input));
128    }
129
130    if matched_row_ranges.len() == 1 {
131        let span = &matched_row_ranges[0];
132        return Ok(Some(input.slice(span.start, span.end - span.start)));
133    }
134
135    let mut builder = BooleanBufferBuilder::new(input.num_rows());
136    builder.append_n(input.num_rows(), false);
137    for span in matched_row_ranges {
138        for i in span {
139            builder.set_bit(i, true);
140        }
141    }
142
143    let filtered = datatypes::arrow::compute::filter_record_batch(
144        &input,
145        &BooleanArray::new(builder.finish(), None),
146    )
147    .context(ComputeArrowSnafu)?;
148    if filtered.num_rows() == 0 {
149        Ok(None)
150    } else {
151        Ok(Some(filtered))
152    }
153}
154
155pub(crate) struct CachedPrimaryKeyFilter {
156    inner: Box<dyn PrimaryKeyFilter>,
157    last_primary_key: Vec<u8>,
158    last_match: Option<bool>,
159}
160
161impl CachedPrimaryKeyFilter {
162    pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
163        Self {
164            inner,
165            last_primary_key: Vec::new(),
166            last_match: None,
167        }
168    }
169}
170
171impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
172    fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
173        if let Some(last_match) = self.last_match
174            && self.last_primary_key == pk
175        {
176            return Ok(last_match);
177        }
178
179        let matched = self.inner.matches(pk)?;
180        self.last_primary_key.clear();
181        self.last_primary_key.extend_from_slice(pk);
182        self.last_match = Some(matched);
183        Ok(matched)
184    }
185}
186
187/// How the bulk-memtable read should apply each predicate.
188///
189/// Unlike the parquet reader, the bulk path has no prefilter pass; predicates
190/// either run row-wise inside the iterator or are pushed down to encoded-PK
191/// matching when the batch still carries the primary-key column.
192pub(crate) struct BulkFilterPlan {
193    /// Simple filters the iterator still has to evaluate row-wise on each batch.
194    pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
195    /// Tag predicates lowered to encoded-PK filters. `None` when the batch
196    /// already exposes raw tag columns or there are no tag predicates.
197    pub(crate) pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
198}
199
200/// How the parquet reader should apply each predicate.
201///
202/// The reader runs in two phases. Predicates routed into `prefilter_builder`
203/// execute on a reduced column set first to compute a refined row selection;
204/// `remaining_simple_filters` execute alongside the full projection on the
205/// normal read path. The contract for what is precise vs best-effort is
206/// documented on [`build_reader_filter_plan`].
207pub(crate) struct ReaderFilterPlan {
208    /// Simple filters that must run on the normal read path: predicates with
209    /// `Matched` / `Pruned` outcomes (which carry expected-metadata
210    /// compatibility decisions later phases rely on), and predicates whose
211    /// column cannot be read directly during the prefilter pass.
212    pub(crate) remaining_simple_filters: Vec<SimpleFilterContext>,
213    /// Pre-built state for the prefilter pass, or `None` when prefiltering is
214    /// not worthwhile (no prefilter columns selected, or the prefilter
215    /// projection would cover nearly the full read).
216    pub(crate) prefilter_builder: Option<PrefilterContextBuilder>,
217}
218
219pub(crate) fn build_bulk_filter_plan(
220    read_format: &FlatReadFormat,
221    predicate: Option<&Predicate>,
222) -> BulkFilterPlan {
223    let metadata = read_format.metadata();
224    // Bulk memtable only needs simple binary filters here. Any filter that
225    // cannot be reduced to a SimpleFilterContext stays out of this fast path.
226    let simple_filters: Vec<SimpleFilterContext> = predicate
227        .into_iter()
228        .flat_map(|predicate| {
229            predicate
230                .exprs()
231                .iter()
232                .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
233        })
234        .collect();
235
236    // PK prefilter only works when flat batches still carry the encoded PK
237    // column. If tags have already been expanded to raw columns, the iterator
238    // can apply those filters directly and there is nothing to extract here.
239    if read_format.batch_has_raw_pk_columns() || metadata.primary_key.is_empty() {
240        return BulkFilterPlan {
241            remaining_simple_filters: simple_filters,
242            pk_filters: None,
243        };
244    }
245
246    let mut remaining_simple_filters = Vec::new();
247    let mut pk_filters = Vec::new();
248
249    for filter_ctx in simple_filters {
250        // Split tag predicates that can be evaluated against the encoded PK
251        // from filters that still need normal row-wise evaluation later.
252        let pk_filter = filter_ctx.filter().as_filter().and_then(|filter| {
253            (filter_ctx.semantic_type() == SemanticType::Tag).then(|| filter.clone())
254        });
255
256        if let Some(pk_filter) = pk_filter {
257            pk_filters.push(pk_filter);
258        } else {
259            remaining_simple_filters.push(filter_ctx);
260        }
261    }
262
263    BulkFilterPlan {
264        remaining_simple_filters,
265        pk_filters: (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)),
266    }
267}
268
269/// Splits a query [`Predicate`] into a [`ReaderFilterPlan`]: predicates that can run
270/// during the prefilter pass (on a reduced projection, to compute a refined row
271/// selection) versus predicates that must run on the normal read path (alongside the
272/// full projection).
273///
274/// The prefilter pass is *best-effort pruning*: a physical-filter predicate is silently
275/// dropped when [`PhysicalFilterContext::new_opt`] returns `None` (column not in the
276/// projected arrow schema). This is safe because the DataFusion `FilterExec` above the
277/// reader always re-applies the original predicate, so the prefilter pass is purely a
278/// pruning hint.
279///
280/// Tag and timestamp predicates that lower to [`SimpleFilterEvaluator`] are an
281/// exception — the engine enforces them precisely, so the prefilter pass is the only
282/// place they execute. They are never silently dropped.
283pub(crate) fn build_reader_filter_plan(
284    predicate: Option<&Predicate>,
285    expected_metadata: Option<&RegionMetadata>,
286    pre_filter_mode: PreFilterMode,
287    read_format: &FlatReadFormat,
288    parquet_schema: &SchemaDescriptor,
289    codec: &Arc<dyn PrimaryKeyCodec>,
290) -> ReaderFilterPlan {
291    let Some(predicate) = predicate else {
292        return ReaderFilterPlan {
293            remaining_simple_filters: Vec::new(),
294            prefilter_builder: None,
295        };
296    };
297
298    let metadata = read_format.metadata();
299    let mut prefilter_simple_filters = Vec::new();
300    let mut remaining_simple_filters = Vec::new();
301    let mut prefilter_physical_filters = Vec::new();
302    let mut primary_key_filters = Vec::new();
303    let mut pk_filter_contexts = Vec::new();
304
305    // `SkipFields` keeps field predicates in the normal read path to avoid a
306    // second read of projected field columns, while tags/timestamp can still
307    // participate in prefiltering.
308    let field_prefilter_enabled = pre_filter_mode == PreFilterMode::All;
309    // When true, tag columns are encoded in the primary key column and are NOT
310    // stored as separate parquet columns. Tag predicates must go through PK
311    // decoding rather than direct column reads.
312    let need_pk_prefilter = !read_format.batch_has_raw_pk_columns();
313
314    // Whether a column can be read directly from parquet for prefiltering,
315    // based on its semantic type and the current mode/format.
316    let can_direct_prefilter = |semantic_type: SemanticType| -> bool {
317        match semantic_type {
318            SemanticType::Tag => !need_pk_prefilter,
319            SemanticType::Field => field_prefilter_enabled,
320            SemanticType::Timestamp => true,
321        }
322    };
323
324    for expr in predicate.exprs() {
325        // Prefer cheap simple filters first. They also preserve `Matched` /
326        // `Pruned` states for columns that only exist in expected metadata.
327        if let Some(filter_ctx) = SimpleFilterContext::new_opt(metadata, expected_metadata, expr) {
328            // `Matched` and `Pruned` come from expected-metadata compatibility
329            // and must stay in the main filter list so later phases keep that
330            // outcome.
331            let Some(filter) = filter_ctx.filter().as_filter() else {
332                remaining_simple_filters.push(filter_ctx);
333                continue;
334            };
335
336            // If the column is stored as a separate parquet column and is already projected in the main read,
337            // we can evaluate the simple filter directly during prefilter.
338            let direct_prefilter = can_direct_prefilter(filter_ctx.semantic_type());
339            if direct_prefilter {
340                assert!(
341                    read_format
342                        .arrow_schema()
343                        .column_with_name(filter.column_name())
344                        .is_some(),
345                    "Column '{}' is not present in the arrow schema {:?}",
346                    filter.column_name(),
347                    read_format.arrow_schema(),
348                );
349                prefilter_simple_filters.push(filter_ctx);
350                continue;
351            }
352
353            // Otherwise try to filter through encoded-PK matching.
354            if need_pk_prefilter && filter_ctx.semantic_type() == SemanticType::Tag {
355                primary_key_filters.push(filter.clone());
356                pk_filter_contexts.push(filter_ctx);
357            } else {
358                remaining_simple_filters.push(filter_ctx);
359            }
360            continue;
361        }
362
363        // Best-effort physical-filter prefilter (see fn-level doc): `new_opt`
364        // returning `None` means the column is not in the projected arrow
365        // schema, and dropping the predicate is safe because the upper
366        // `FilterExec` re-applies it.
367        if let Some(filter) =
368            PhysicalFilterContext::new_opt(metadata, expected_metadata, read_format, expr)
369            && can_direct_prefilter(filter.semantic_type())
370        {
371            prefilter_physical_filters.push(filter);
372        }
373    }
374
375    let pk_filter_exprs =
376        (!primary_key_filters.is_empty()).then_some(Arc::new(primary_key_filters));
377    let prefilter_builder = PrefilterContextBuilder::new(
378        read_format,
379        codec,
380        pk_filter_exprs,
381        prefilter_simple_filters.clone(),
382        prefilter_physical_filters,
383        parquet_schema,
384    );
385
386    if prefilter_builder.is_some() {
387        ReaderFilterPlan {
388            remaining_simple_filters,
389            prefilter_builder,
390        }
391    } else {
392        // If prefilter setup is not worthwhile, keep the original simple
393        // filters on the normal path so behavior is unchanged.
394        remaining_simple_filters.extend(prefilter_simple_filters);
395        remaining_simple_filters.extend(pk_filter_contexts);
396        ReaderFilterPlan {
397            remaining_simple_filters,
398            prefilter_builder: None,
399        }
400    }
401}
402
403/// Context for prefiltering a row group.
404pub(crate) struct PrefilterContext {
405    /// Projection mask for reading prefilter columns.
406    projection: ProjectionMask,
407    /// Optional PK filter for legacy primary-key-format parquet.
408    pk_filter: Option<Box<dyn PrimaryKeyFilter>>,
409    /// Simple filters that can be evaluated directly from the prefilter batch.
410    filters: Vec<SimpleFilterContext>,
411    /// Physical filters that can be evaluated directly from the prefilter batch.
412    /// Physical expressions are only applied in the prefilter phase.
413    physical_filters: Vec<PhysicalFilterContext>,
414}
415
416/// Pre-built state for constructing [PrefilterContext] per row group.
417///
418/// Fields invariant across row groups (projection mask, codec, metadata, filters)
419/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
420/// is created via [PrefilterContextBuilder::build()] for each row group.
421pub(crate) struct PrefilterContextBuilder {
422    projection: ProjectionMask,
423    pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
424    filters: Vec<SimpleFilterContext>,
425    physical_filters: Vec<PhysicalFilterContext>,
426    codec: Arc<dyn PrimaryKeyCodec>,
427    metadata: RegionMetadataRef,
428}
429
430impl PrefilterContextBuilder {
431    /// Creates a builder if prefiltering is applicable.
432    ///
433    /// Returns `None` if:
434    /// - The read format doesn't use flat layout
435    /// - No prefilter columns are selected
436    /// - Prefilter would read the full projection without any PK filter
437    pub(crate) fn new(
438        read_format: &FlatReadFormat,
439        codec: &Arc<dyn PrimaryKeyCodec>,
440        primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
441        filters: Vec<SimpleFilterContext>,
442        physical_filters: Vec<PhysicalFilterContext>,
443        parquet_schema: &SchemaDescriptor,
444    ) -> Option<Self> {
445        let metadata = read_format.metadata();
446        let use_raw_tag_columns = read_format.batch_has_raw_pk_columns();
447        let pk_filters = (!use_raw_tag_columns)
448            .then_some(primary_key_filters)
449            .flatten()
450            .filter(|filters| !filters.is_empty());
451
452        let mut prefilter_column_names = HashSet::new();
453        for filter_ctx in &filters {
454            if let MaybeFilter::Filter(filter) = filter_ctx.filter() {
455                prefilter_column_names.insert(filter.column_name().to_string());
456            }
457        }
458
459        if pk_filters.is_some() {
460            prefilter_column_names.insert(PRIMARY_KEY_COLUMN_NAME.to_string());
461        }
462
463        for filter_ctx in &physical_filters {
464            prefilter_column_names.insert(filter_ctx.column_name().to_string());
465        }
466
467        let (projection, prefilter_count) = compute_projection_mask(
468            &prefilter_column_names,
469            read_format.arrow_schema(),
470            parquet_schema,
471        );
472
473        if prefilter_count == 0 {
474            return None;
475        }
476
477        let total_count = read_format.parquet_read_columns().root_indices().len();
478        let remaining_count = total_count.saturating_sub(prefilter_count);
479        if pk_filters.is_none() && prefilter_count >= total_count {
480            return None;
481        }
482
483        if pk_filters.is_none()
484            && !should_use_prefilter(prefilter_count, remaining_count, total_count)
485        {
486            return None;
487        }
488
489        Some(Self {
490            projection,
491            pk_filters,
492            filters,
493            physical_filters,
494            codec: Arc::clone(codec),
495            metadata: metadata.clone(),
496        })
497    }
498
499    /// Builds a [PrefilterContext] for a specific row group.
500    pub(crate) fn build(&self) -> PrefilterContext {
501        let pk_filter = self.pk_filters.as_ref().map(|pk_filters| {
502            let pk_filter =
503                self.codec
504                    .primary_key_filter(&self.metadata, Arc::clone(pk_filters), false);
505            Box::new(CachedPrimaryKeyFilter::new(pk_filter)) as Box<dyn PrimaryKeyFilter>
506        });
507        PrefilterContext {
508            projection: self.projection.clone(),
509            pk_filter,
510            filters: self.filters.clone(),
511            physical_filters: self.physical_filters.clone(),
512        }
513    }
514}
515
516const PREFILTER_COLUMN_RATIO_THRESHOLD: f64 = 0.5;
517const PREFILTER_MIN_REMAINING_COLUMNS: usize = 2;
518
519/// Result of prefiltering a row group.
520pub(crate) struct PrefilterResult {
521    /// Refined row selection after prefiltering.
522    pub(crate) refined_selection: RowSelection,
523    /// Number of rows filtered out by prefiltering.
524    pub(crate) filtered_rows: usize,
525}
526
527/// Executes prefiltering on a row group.
528///
529/// Reads only the prefilter columns (currently the PK dictionary column),
530/// applies filters, and returns a refined [RowSelection].
531fn compute_projection_mask(
532    column_names: &HashSet<String>,
533    arrow_schema: &datatypes::arrow::datatypes::SchemaRef,
534    parquet_schema: &SchemaDescriptor,
535) -> (ProjectionMask, usize) {
536    let mut projection_indices: Vec<usize> = column_names
537        .iter()
538        .filter_map(|name| arrow_schema.column_with_name(name).map(|(index, _)| index))
539        .collect();
540    projection_indices.sort_unstable();
541    projection_indices.dedup();
542    let count = projection_indices.len();
543    (
544        ProjectionMask::roots(parquet_schema, projection_indices.iter().copied()),
545        count,
546    )
547}
548
549fn should_use_prefilter(
550    prefilter_count: usize,
551    remaining_count: usize,
552    total_count: usize,
553) -> bool {
554    if remaining_count == 0 {
555        return false;
556    }
557
558    if remaining_count < PREFILTER_MIN_REMAINING_COLUMNS {
559        return false;
560    }
561
562    let ratio = prefilter_count as f64 / total_count as f64;
563    ratio <= PREFILTER_COLUMN_RATIO_THRESHOLD
564}
565
566pub(crate) async fn execute_prefilter(
567    prefilter_ctx: &mut PrefilterContext,
568    reader_builder: &RowGroupReaderBuilder,
569    build_ctx: &RowGroupBuildContext<'_>,
570) -> Result<PrefilterResult> {
571    let mut stream = reader_builder
572        .build_with_projection(
573            build_ctx.row_group_idx,
574            build_ctx.row_selection.clone(),
575            prefilter_ctx.projection.clone(),
576            build_ctx.fetch_metrics,
577        )
578        .await?;
579
580    let mut filter_arrays = Vec::new();
581    let mut rows_before_filter = 0usize;
582    let mut rows_selected = 0usize;
583
584    while let Some(batch_result) = stream.next().await {
585        let batch = batch_result.context(ReadParquetSnafu {
586            path: reader_builder.file_path(),
587        })?;
588        let num_rows = batch.num_rows();
589        if num_rows == 0 {
590            continue;
591        }
592        rows_before_filter += num_rows;
593
594        let batch_mask = match apply_filters_to_batch(
595            &batch,
596            &mut prefilter_ctx.pk_filter,
597            &prefilter_ctx.filters,
598            &prefilter_ctx.physical_filters,
599            reader_builder.file_path(),
600        )? {
601            Some(mask) => mask,
602            None => BooleanBuffer::new_unset(num_rows),
603        };
604        rows_selected += batch_mask.count_set_bits();
605        filter_arrays.push(BooleanArray::from(batch_mask));
606    }
607
608    let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
609    let refined_selection = if filter_arrays.is_empty() || rows_selected == 0 {
610        RowSelection::from(vec![])
611    } else {
612        let prefilter_selection = RowSelection::from_filters(&filter_arrays);
613        match &build_ctx.row_selection {
614            Some(original) => original.and_then(&prefilter_selection),
615            None => prefilter_selection,
616        }
617    };
618
619    Ok(PrefilterResult {
620        refined_selection,
621        filtered_rows,
622    })
623}
624
625fn apply_filters_to_batch(
626    batch: &RecordBatch,
627    pk_filter: &mut Option<Box<dyn PrimaryKeyFilter>>,
628    filters: &[SimpleFilterContext],
629    physical_filters: &[PhysicalFilterContext],
630    file_path: &str,
631) -> Result<Option<BooleanBuffer>> {
632    let mut mask = BooleanBuffer::new_set(batch.num_rows());
633
634    if let Some(pk_filter) = pk_filter.as_mut() {
635        // Prefilter reads a reduced projection. For PK prefilter, the encoded
636        // primary key column is always appended as the last projected column,
637        // while `__sequence` and `__op_type` are not read.
638        let pk_column_index = batch.num_columns() - 1;
639        let matched_row_ranges =
640            matching_row_ranges_by_primary_key(batch, pk_column_index, pk_filter.as_mut())?;
641        let mut builder = BooleanBufferBuilder::new(batch.num_rows());
642        builder.append_n(batch.num_rows(), false);
643        for range in matched_row_ranges {
644            for row in range {
645                builder.set_bit(row, true);
646            }
647        }
648        mask = mask.bitand(&builder.finish());
649    }
650
651    for filter_ctx in filters {
652        let filter = match filter_ctx.filter() {
653            MaybeFilter::Filter(filter) => filter,
654            MaybeFilter::Matched => continue,
655            MaybeFilter::Pruned => return Ok(None),
656        };
657
658        let (idx, _) = batch
659            .schema()
660            .column_with_name(filter.column_name())
661            .with_context(|| UnexpectedSnafu {
662                reason: format!(
663                    "Prefilter column '{}' (id {}) not found in batch for file {}",
664                    filter.column_name(),
665                    filter_ctx.column_id(),
666                    file_path
667                ),
668            })?;
669        let column = batch.column(idx).clone();
670        let result = filter.evaluate_array(&column).context(RecordBatchSnafu)?;
671        mask = mask.bitand(&result);
672    }
673
674    for filter_ctx in physical_filters {
675        let filter = filter_ctx.filter();
676
677        let (idx, _) = batch
678            .schema()
679            .column_with_name(filter_ctx.column_name())
680            .with_context(|| UnexpectedSnafu {
681                reason: format!(
682                    "Prefilter physical column '{}' (id {}) not found in batch for file {}",
683                    filter_ctx.column_name(),
684                    filter_ctx.column_id(),
685                    file_path
686                ),
687            })?;
688        let column = batch.column(idx).clone();
689
690        let record_batch = RecordBatch::try_new(filter_ctx.schema().clone(), vec![column])
691            .context(NewRecordBatchSnafu)?;
692        let evaluated = filter
693            .evaluate(&record_batch)
694            .context(EvalPartitionFilterSnafu)?;
695        let array = evaluated
696            .into_array(record_batch.num_rows())
697            .context(EvalPartitionFilterSnafu)?;
698        let boolean_array =
699            array
700                .as_any()
701                .downcast_ref::<BooleanArray>()
702                .context(UnexpectedSnafu {
703                    reason: "Failed to downcast physical filter result to BooleanArray",
704                })?;
705        // Treat null results as false (filtered out); value bits are not guaranteed
706        // to be false for invalid entries.
707        let mut result = boolean_array.values().clone();
708        if let Some(nulls) = boolean_array.nulls() {
709            result = result.bitand(nulls.inner());
710        }
711        mask = mask.bitand(&result);
712    }
713
714    if mask.count_set_bits() == 0 {
715        Ok(None)
716    } else {
717        Ok(Some(mask))
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use std::sync::Arc;
724    use std::sync::atomic::{AtomicUsize, Ordering};
725
726    use common_recordbatch::filter::SimpleFilterEvaluator;
727    use datafusion_expr::{col, lit};
728    use datatypes::arrow::array::{
729        ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
730    };
731    use datatypes::arrow::datatypes::{Schema, UInt32Type};
732    use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
733    use parquet::arrow::ArrowSchemaConverter;
734    use store_api::codec::PrimaryKeyEncoding;
735
736    use super::*;
737    use crate::read::read_columns::ReadColumns;
738    use crate::sst::internal_fields;
739    use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
740    use crate::test_util::sst_util::{
741        new_primary_key, new_record_batch_with_custom_sequence, sst_region_metadata,
742        sst_region_metadata_with_encoding,
743    };
744
745    struct CountingPrimaryKeyFilter {
746        hits: Arc<AtomicUsize>,
747        expected: Vec<u8>,
748    }
749
750    impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
751        fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
752            self.hits.fetch_add(1, Ordering::Relaxed);
753            Ok(pk == self.expected.as_slice())
754        }
755    }
756
757    #[test]
758    fn test_cached_primary_key_filter_reuses_previous_result() {
759        let expected = new_primary_key(&["a", "x"]);
760        let hits = Arc::new(AtomicUsize::new(0));
761        let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
762            hits: Arc::clone(&hits),
763            expected: expected.clone(),
764        }));
765
766        assert!(filter.matches(expected.as_slice()).unwrap());
767        assert!(filter.matches(expected.as_slice()).unwrap());
768        assert!(
769            !filter
770                .matches(new_primary_key(&["b", "x"]).as_slice())
771                .unwrap()
772        );
773
774        assert_eq!(hits.load(Ordering::Relaxed), 2);
775    }
776
777    fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
778        exprs
779            .iter()
780            .filter_map(SimpleFilterEvaluator::try_new)
781            .collect()
782    }
783
784    fn new_simple_filter_contexts(
785        metadata: &RegionMetadataRef,
786        exprs: &[datafusion_expr::Expr],
787    ) -> Vec<SimpleFilterContext> {
788        exprs
789            .iter()
790            .filter_map(|expr| SimpleFilterContext::new_opt(metadata, None, expr))
791            .collect()
792    }
793
794    fn new_physical_filter_contexts(
795        metadata: &RegionMetadataRef,
796        read_format: &FlatReadFormat,
797        exprs: &[datafusion_expr::Expr],
798    ) -> Vec<PhysicalFilterContext> {
799        exprs
800            .iter()
801            .filter_map(|expr| PhysicalFilterContext::new_opt(metadata, None, read_format, expr))
802            .collect()
803    }
804
805    fn parquet_schema(read_format: &FlatReadFormat) -> SchemaDescriptor {
806        ArrowSchemaConverter::new()
807            .convert(read_format.arrow_schema())
808            .unwrap()
809    }
810
811    fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
812        assert_eq!(primary_keys.len(), field_values.len());
813
814        let metadata = Arc::new(sst_region_metadata());
815        let arrow_schema = metadata.schema.arrow_schema();
816        let field_column = arrow_schema
817            .field(arrow_schema.index_of("field_0").unwrap())
818            .clone();
819        let time_index_column = arrow_schema
820            .field(arrow_schema.index_of("ts").unwrap())
821            .clone();
822        let mut fields = vec![field_column, time_index_column];
823        fields.extend(
824            internal_fields()
825                .into_iter()
826                .map(|field| field.as_ref().clone()),
827        );
828        let schema = Arc::new(Schema::new(fields));
829
830        let mut dict_values = Vec::new();
831        let mut keys = Vec::with_capacity(primary_keys.len());
832        for pk in primary_keys {
833            let key = dict_values
834                .iter()
835                .position(|existing: &&[u8]| existing == pk)
836                .unwrap_or_else(|| {
837                    dict_values.push(*pk);
838                    dict_values.len() - 1
839                });
840            keys.push(key as u32);
841        }
842        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
843            UInt32Array::from(keys),
844            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
845        ));
846
847        RecordBatch::try_new(
848            schema,
849            vec![
850                Arc::new(UInt64Array::from(field_values.to_vec())),
851                Arc::new(TimestampMillisecondArray::from_iter_values(
852                    0..primary_keys.len() as i64,
853                )),
854                pk_array,
855                Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
856                Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
857            ],
858        )
859        .unwrap()
860    }
861
862    fn new_prefilter_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
863        assert_eq!(primary_keys.len(), field_values.len());
864
865        let metadata = Arc::new(sst_region_metadata());
866        let arrow_schema = metadata.schema.arrow_schema();
867        let field_column = arrow_schema
868            .field(arrow_schema.index_of("field_0").unwrap())
869            .clone();
870        let time_index_column = arrow_schema
871            .field(arrow_schema.index_of("ts").unwrap())
872            .clone();
873        let schema = Arc::new(Schema::new(vec![
874            field_column,
875            time_index_column,
876            internal_fields()[0].as_ref().clone(),
877        ]));
878
879        let mut dict_values = Vec::new();
880        let mut keys = Vec::with_capacity(primary_keys.len());
881        for pk in primary_keys {
882            let key = dict_values
883                .iter()
884                .position(|existing: &&[u8]| existing == pk)
885                .unwrap_or_else(|| {
886                    dict_values.push(*pk);
887                    dict_values.len() - 1
888                });
889            keys.push(key as u32);
890        }
891        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
892            UInt32Array::from(keys),
893            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
894        ));
895
896        RecordBatch::try_new(
897            schema,
898            vec![
899                Arc::new(UInt64Array::from(field_values.to_vec())),
900                Arc::new(TimestampMillisecondArray::from_iter_values(
901                    0..primary_keys.len() as i64,
902                )),
903                pk_array,
904            ],
905        )
906        .unwrap()
907    }
908
909    fn field_values(batch: &RecordBatch) -> Vec<u64> {
910        batch
911            .column(0)
912            .as_any()
913            .downcast_ref::<UInt64Array>()
914            .unwrap()
915            .values()
916            .to_vec()
917    }
918
919    fn remaining_simple_filter_columns(filters: &[SimpleFilterContext]) -> Vec<&str> {
920        filters
921            .iter()
922            .map(|filter_ctx| filter_ctx.filter().as_filter().unwrap().column_name())
923            .collect()
924    }
925
926    #[test]
927    fn test_prefilter_primary_key_drops_single_dictionary_batch() {
928        let metadata = Arc::new(sst_region_metadata());
929        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
930        let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
931            .primary_key_filter(&metadata, filters, false);
932        let pk_a = new_primary_key(&["a", "x"]);
933        let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
934        let pk_col_idx = primary_key_column_index(batch.num_columns());
935
936        let filtered =
937            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
938                .unwrap();
939
940        assert!(filtered.is_none());
941    }
942
943    #[test]
944    fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
945        let metadata = Arc::new(sst_region_metadata());
946        let filters = Arc::new(new_test_filters(&[col("tag_0")
947            .eq(lit("a"))
948            .or(col("tag_0").eq(lit("c")))]));
949        let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
950            .primary_key_filter(&metadata, filters, false);
951        let pk_a = new_primary_key(&["a", "x"]);
952        let pk_b = new_primary_key(&["b", "x"]);
953        let pk_c = new_primary_key(&["c", "x"]);
954        let pk_d = new_primary_key(&["d", "x"]);
955        let batch = new_raw_batch(
956            &[
957                pk_a.as_slice(),
958                pk_a.as_slice(),
959                pk_b.as_slice(),
960                pk_b.as_slice(),
961                pk_c.as_slice(),
962                pk_c.as_slice(),
963                pk_d.as_slice(),
964                pk_d.as_slice(),
965            ],
966            &[10, 11, 12, 13, 14, 15, 16, 17],
967        );
968        let pk_col_idx = primary_key_column_index(batch.num_columns());
969
970        let filtered =
971            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
972                .unwrap()
973                .unwrap();
974
975        assert_eq!(filtered.num_rows(), 4);
976        assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
977    }
978
979    #[test]
980    fn test_prefilter_builder_returns_none_without_selected_filters() {
981        let metadata: RegionMetadataRef =
982            Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
983        let read_format = FlatReadFormat::new(
984            metadata.clone(),
985            ReadColumns::from_deduped_column_ids(
986                metadata.column_metadatas.iter().map(|c| c.column_id),
987            ),
988            None,
989            "test",
990            false,
991        )
992        .unwrap();
993        let codec = build_primary_key_codec(metadata.as_ref());
994        let parquet_schema = parquet_schema(&read_format);
995
996        let builder = PrefilterContextBuilder::new(
997            &read_format,
998            &codec,
999            None,
1000            Vec::new(),
1001            Vec::new(),
1002            &parquet_schema,
1003        );
1004        assert!(builder.is_none());
1005    }
1006
1007    #[test]
1008    fn test_should_use_prefilter() {
1009        assert!(should_use_prefilter(1, 5, 6));
1010        assert!(!should_use_prefilter(1, 0, 1));
1011        assert!(!should_use_prefilter(1, 1, 2));
1012        assert!(!should_use_prefilter(4, 3, 7));
1013        assert!(should_use_prefilter(3, 3, 6));
1014    }
1015
1016    #[test]
1017    fn test_build_bulk_filter_plan_classifies_filters_across_read_paths() {
1018        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata_with_encoding(
1019            PrimaryKeyEncoding::Sparse,
1020        ));
1021        let legacy_read_format = FlatReadFormat::new(
1022            metadata.clone(),
1023            ReadColumns::from_deduped_column_ids(
1024                metadata.column_metadatas.iter().map(|c| c.column_id),
1025            ),
1026            None,
1027            "memtable",
1028            false,
1029        )
1030        .unwrap();
1031        assert!(!legacy_read_format.batch_has_raw_pk_columns());
1032
1033        let plan = build_bulk_filter_plan(
1034            &legacy_read_format,
1035            Some(&Predicate::new(vec![
1036                col("tag_0").eq(lit("a")),
1037                col("field_0").gt(lit(1_u64)),
1038            ])),
1039        );
1040        assert_eq!(
1041            plan.pk_filters.as_ref().map(|filters| filters.len()),
1042            Some(1)
1043        );
1044        assert_eq!(
1045            remaining_simple_filter_columns(&plan.remaining_simple_filters),
1046            vec!["field_0"]
1047        );
1048
1049        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1050        let raw_pk_read_format = FlatReadFormat::new(
1051            metadata.clone(),
1052            ReadColumns::from_deduped_column_ids(
1053                metadata.column_metadatas.iter().map(|c| c.column_id),
1054            ),
1055            None,
1056            "memtable",
1057            true,
1058        )
1059        .unwrap();
1060        assert!(raw_pk_read_format.batch_has_raw_pk_columns());
1061
1062        let tag_only_plan = build_bulk_filter_plan(
1063            &raw_pk_read_format,
1064            Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1065        );
1066        assert!(tag_only_plan.pk_filters.is_none());
1067        assert_eq!(
1068            remaining_simple_filter_columns(&tag_only_plan.remaining_simple_filters),
1069            vec!["tag_0"]
1070        );
1071
1072        let field_only_plan = build_bulk_filter_plan(
1073            &raw_pk_read_format,
1074            Some(&Predicate::new(vec![col("field_0").gt(lit(1_u64))])),
1075        );
1076        assert!(field_only_plan.pk_filters.is_none());
1077        assert_eq!(
1078            remaining_simple_filter_columns(&field_only_plan.remaining_simple_filters),
1079            vec!["field_0"]
1080        );
1081    }
1082
1083    #[test]
1084    fn test_build_reader_filter_plan_classifies_filters_for_prefilter_modes() {
1085        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1086        let full_read_format = FlatReadFormat::new(
1087            metadata.clone(),
1088            ReadColumns::from_deduped_column_ids(
1089                metadata.column_metadatas.iter().map(|c| c.column_id),
1090            ),
1091            None,
1092            "test",
1093            true,
1094        )
1095        .unwrap();
1096        let full_parquet_schema = parquet_schema(&full_read_format);
1097        let codec = build_primary_key_codec(metadata.as_ref());
1098
1099        let skip_fields_plan = build_reader_filter_plan(
1100            Some(&Predicate::new(vec![
1101                col("tag_0").eq(lit("a")),
1102                col("field_0").gt(lit(1_u64)),
1103            ])),
1104            None,
1105            PreFilterMode::SkipFields,
1106            &full_read_format,
1107            &full_parquet_schema,
1108            &codec,
1109        );
1110        assert!(skip_fields_plan.prefilter_builder.is_some());
1111        assert_eq!(
1112            remaining_simple_filter_columns(&skip_fields_plan.remaining_simple_filters),
1113            vec!["field_0"]
1114        );
1115
1116        let field_0 = metadata.column_by_name("field_0").unwrap().column_id;
1117        let ts = metadata.time_index_column().column_id;
1118        let projected_read_format = FlatReadFormat::new(
1119            metadata.clone(),
1120            ReadColumns::from_deduped_column_ids([field_0, ts]),
1121            None,
1122            "test",
1123            true,
1124        )
1125        .unwrap();
1126        let projected_parquet_schema = parquet_schema(&projected_read_format);
1127        let pk_prefilter_plan = build_reader_filter_plan(
1128            Some(&Predicate::new(vec![col("tag_0").eq(lit("a"))])),
1129            None,
1130            PreFilterMode::All,
1131            &projected_read_format,
1132            &projected_parquet_schema,
1133            &codec,
1134        );
1135        assert!(pk_prefilter_plan.prefilter_builder.is_some());
1136        assert!(pk_prefilter_plan.remaining_simple_filters.is_empty());
1137    }
1138
1139    #[test]
1140    fn test_apply_filters_to_batch_uses_flat_tag_columns_directly() {
1141        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1142        let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1143        let batch = new_record_batch_with_custom_sequence(&["a", "x"], 0, 4, 1);
1144
1145        let mut no_pk_filter = None;
1146        let mask = apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test")
1147            .unwrap()
1148            .unwrap();
1149        assert_eq!(mask.count_set_bits(), 4);
1150    }
1151
1152    #[test]
1153    fn test_apply_filters_to_batch_errors_on_missing_selected_column() {
1154        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
1155        let filters = new_simple_filter_contexts(&metadata, &[col("tag_0").eq(lit("a"))]);
1156        let pk = new_primary_key(&["a", "x"]);
1157        let batch = new_raw_batch(&[pk.as_slice()], &[10]);
1158
1159        let mut no_pk_filter = None;
1160        let err =
1161            apply_filters_to_batch(&batch, &mut no_pk_filter, &filters, &[], "test").unwrap_err();
1162        let err = err.to_string();
1163        assert!(err.contains("Prefilter column"));
1164        assert!(err.contains("tag_0"));
1165    }
1166
1167    #[test]
1168    fn test_apply_filters_to_batch_evaluates_physical_filters() {
1169        let metadata: RegionMetadataRef =
1170            Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
1171        let read_format = FlatReadFormat::new(
1172            metadata.clone(),
1173            ReadColumns::from_deduped_column_ids(
1174                metadata.column_metadatas.iter().map(|c| c.column_id),
1175            ),
1176            None,
1177            "test",
1178            false,
1179        )
1180        .unwrap();
1181        let expr = col("field_0").in_list(vec![lit(11_u64)], false);
1182        let physical_filters = new_physical_filter_contexts(&metadata, &read_format, &[expr]);
1183        let pk = new_primary_key(&["a", "x"]);
1184        let batch = new_raw_batch(&[pk.as_slice(), pk.as_slice(), pk.as_slice()], &[9, 10, 11]);
1185
1186        let mut no_pk_filter = None;
1187        let mask =
1188            apply_filters_to_batch(&batch, &mut no_pk_filter, &[], &physical_filters, "test")
1189                .unwrap()
1190                .unwrap();
1191        assert_eq!(mask.count_set_bits(), 1);
1192    }
1193
1194    #[test]
1195    fn test_apply_filters_to_batch_uses_last_projected_column_for_pk_prefilter() {
1196        let metadata = Arc::new(sst_region_metadata());
1197        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("a"))]));
1198        let mut pk_filter = Some(Box::new(CachedPrimaryKeyFilter::new(
1199            build_primary_key_codec(metadata.as_ref())
1200                .primary_key_filter(&metadata, filters, false),
1201        )) as Box<dyn PrimaryKeyFilter>);
1202        let pk_a = new_primary_key(&["a", "x"]);
1203        let pk_b = new_primary_key(&["b", "x"]);
1204        let batch = new_prefilter_batch(
1205            &[
1206                pk_a.as_slice(),
1207                pk_a.as_slice(),
1208                pk_b.as_slice(),
1209                pk_b.as_slice(),
1210            ],
1211            &[10, 11, 12, 13],
1212        );
1213
1214        let mask = apply_filters_to_batch(&batch, &mut pk_filter, &[], &[], "test")
1215            .unwrap()
1216            .unwrap();
1217
1218        assert_eq!(mask.count_set_bits(), 2);
1219    }
1220}