mito2/sst/parquet/
prefilter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prefilter framework for parquet reader.
16//!
17//! Prefilter optimization reduces I/O by reading only a subset of columns first
18//! (the prefilter phase), applying filters to compute a refined row selection,
19//! then reading the remaining columns with the refined selection.
20
21use std::ops::Range;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use datatypes::arrow::array::BinaryArray;
27use datatypes::arrow::record_batch::RecordBatch;
28use futures::StreamExt;
29use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
30use parquet::arrow::ProjectionMask;
31use parquet::arrow::arrow_reader::RowSelection;
32use parquet::schema::types::SchemaDescriptor;
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::{RegionMetadata, RegionMetadataRef};
35
36use crate::error::{DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
37use crate::sst::parquet::flat_format::primary_key_column_index;
38use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
39use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
40use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact;
41
42pub(crate) fn matching_row_ranges_by_primary_key(
43    input: &RecordBatch,
44    pk_column_index: usize,
45    pk_filter: &mut dyn PrimaryKeyFilter,
46) -> Result<Vec<Range<usize>>> {
47    let pk_dict_array = input
48        .column(pk_column_index)
49        .as_any()
50        .downcast_ref::<PrimaryKeyArray>()
51        .context(UnexpectedSnafu {
52            reason: "Primary key column is not a dictionary array",
53        })?;
54    let pk_values = pk_dict_array
55        .values()
56        .as_any()
57        .downcast_ref::<BinaryArray>()
58        .context(UnexpectedSnafu {
59            reason: "Primary key values are not binary array",
60        })?;
61    let keys = pk_dict_array.keys();
62    let key_values = keys.values();
63
64    if key_values.is_empty() {
65        return Ok(std::iter::once(0..input.num_rows()).collect());
66    }
67
68    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
69    let mut start = 0;
70    while start < key_values.len() {
71        let key = key_values[start];
72        let mut end = start + 1;
73        while end < key_values.len() && key_values[end] == key {
74            end += 1;
75        }
76
77        if pk_filter
78            .matches(pk_values.value(key as usize))
79            .context(DecodeSnafu)?
80        {
81            if let Some(last) = matched_row_ranges.last_mut()
82                && last.end == start
83            {
84                last.end = end;
85            } else {
86                matched_row_ranges.push(start..end);
87            }
88        }
89
90        start = end;
91    }
92
93    Ok(matched_row_ranges)
94}
95
96/// Returns whether a filter can be applied by parquet primary-key prefiltering.
97///
98/// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates
99/// on the partition column.
100pub(crate) fn is_usable_primary_key_filter(
101    sst_metadata: &RegionMetadataRef,
102    expected_metadata: Option<&RegionMetadata>,
103    filter: &SimpleFilterEvaluator,
104) -> bool {
105    let sst_column = match expected_metadata {
106        Some(expected_metadata) => {
107            let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
108            else {
109                return false;
110            };
111            let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else {
112                return false;
113            };
114
115            if sst_column.column_schema.name != expected_column.column_schema.name
116                || sst_column.semantic_type != expected_column.semantic_type
117                || sst_column.column_schema.data_type != expected_column.column_schema.data_type
118            {
119                return false;
120            }
121
122            sst_column
123        }
124        None => {
125            let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else {
126                return false;
127            };
128            sst_column
129        }
130    };
131
132    sst_column.semantic_type == SemanticType::Tag
133        && sst_metadata
134            .primary_key_index(sst_column.column_id)
135            .is_some()
136}
137
138pub(crate) struct CachedPrimaryKeyFilter {
139    inner: Box<dyn PrimaryKeyFilter>,
140    last_primary_key: Vec<u8>,
141    last_match: Option<bool>,
142}
143
144impl CachedPrimaryKeyFilter {
145    pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
146        Self {
147            inner,
148            last_primary_key: Vec::new(),
149            last_match: None,
150        }
151    }
152}
153
154impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
155    fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
156        if let Some(last_match) = self.last_match
157            && self.last_primary_key == pk
158        {
159            return Ok(last_match);
160        }
161
162        let matched = self.inner.matches(pk)?;
163        self.last_primary_key.clear();
164        self.last_primary_key.extend_from_slice(pk);
165        self.last_match = Some(matched);
166        Ok(matched)
167    }
168}
169
170/// Context for prefiltering a row group.
171///
172/// Currently supports primary key (PK) filtering only.
173/// Will be extended with simple column filters and physical filters in the future.
174pub(crate) struct PrefilterContext {
175    /// PK filter instance.
176    pk_filter: Box<dyn PrimaryKeyFilter>,
177    /// Projection mask for reading only the PK column.
178    pk_projection: ProjectionMask,
179    /// Index of the PK column within the prefilter projection batch.
180    /// This is 0 when we project only the PK column.
181    pk_column_index: usize,
182}
183
184/// Pre-built state for constructing [PrefilterContext] per row group.
185///
186/// Fields invariant across row groups (projection mask, codec, metadata, filters)
187/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
188/// is created via [PrefilterContextBuilder::build()] for each row group.
189pub(crate) struct PrefilterContextBuilder {
190    pk_projection: ProjectionMask,
191    pk_column_index: usize,
192    codec: Arc<dyn PrimaryKeyCodec>,
193    metadata: RegionMetadataRef,
194    pk_filters: Arc<Vec<SimpleFilterEvaluator>>,
195}
196
197impl PrefilterContextBuilder {
198    /// Creates a builder if prefiltering is applicable.
199    ///
200    /// Returns `None` if:
201    /// - No primary key filters are available
202    /// - The read format doesn't use flat layout with dictionary-encoded PKs
203    /// - The primary key is empty
204    pub(crate) fn new(
205        read_format: &ReadFormat,
206        codec: &Arc<dyn PrimaryKeyCodec>,
207        primary_key_filters: Option<&Arc<Vec<SimpleFilterEvaluator>>>,
208        parquet_schema: &SchemaDescriptor,
209    ) -> Option<Self> {
210        let pk_filters = primary_key_filters?;
211        if pk_filters.is_empty() {
212            return None;
213        }
214
215        let metadata = read_format.metadata();
216        if metadata.primary_key.is_empty() {
217            return None;
218        }
219
220        // Only flat format with dictionary-encoded PKs supports PK prefiltering.
221        let flat_format = read_format.as_flat()?;
222        if !flat_format.raw_batch_has_primary_key_dictionary() {
223            return None;
224        }
225
226        // Compute PK-only projection mask.
227        let num_parquet_columns = parquet_schema.num_columns();
228        let pk_index = primary_key_column_index(num_parquet_columns);
229        let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]);
230
231        // The PK column is the only column in the projection, so its index is 0.
232        let pk_column_index = 0;
233
234        Some(Self {
235            pk_projection,
236            pk_column_index,
237            codec: Arc::clone(codec),
238            metadata: metadata.clone(),
239            pk_filters: Arc::clone(pk_filters),
240        })
241    }
242
243    /// Builds a [PrefilterContext] for a specific row group.
244    pub(crate) fn build(&self) -> PrefilterContext {
245        // Parquet PK prefilter always supports the partition column. Only
246        // PartitionTreeMemtable skips it after partition pruning.
247        let pk_filter =
248            self.codec
249                .primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false);
250        let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter));
251        PrefilterContext {
252            pk_filter,
253            pk_projection: self.pk_projection.clone(),
254            pk_column_index: self.pk_column_index,
255        }
256    }
257}
258
259/// Result of prefiltering a row group.
260pub(crate) struct PrefilterResult {
261    /// Refined row selection after prefiltering.
262    pub(crate) refined_selection: RowSelection,
263    /// Number of rows filtered out by prefiltering.
264    pub(crate) filtered_rows: usize,
265}
266
267/// Executes prefiltering on a row group.
268///
269/// Reads only the prefilter columns (currently the PK dictionary column),
270/// applies filters, and returns a refined [RowSelection].
271pub(crate) async fn execute_prefilter(
272    prefilter_ctx: &mut PrefilterContext,
273    reader_builder: &RowGroupReaderBuilder,
274    build_ctx: &RowGroupBuildContext<'_>,
275) -> Result<PrefilterResult> {
276    // Reads PK column only.
277    let mut pk_stream = reader_builder
278        .build_with_projection(
279            build_ctx.row_group_idx,
280            build_ctx.row_selection.clone(),
281            prefilter_ctx.pk_projection.clone(),
282            build_ctx.fetch_metrics,
283        )
284        .await?;
285
286    // Applies PK filter to each batch and collect matching row ranges.
287    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
288    let mut row_offset = 0;
289    let mut rows_before_filter = 0usize;
290
291    while let Some(batch_result) = pk_stream.next().await {
292        let batch = batch_result.context(ReadParquetSnafu {
293            path: reader_builder.file_path(),
294        })?;
295        let batch_num_rows = batch.num_rows();
296        if batch_num_rows == 0 {
297            continue;
298        }
299        rows_before_filter += batch_num_rows;
300
301        let ranges = matching_row_ranges_by_primary_key(
302            &batch,
303            prefilter_ctx.pk_column_index,
304            prefilter_ctx.pk_filter.as_mut(),
305        )?;
306        matched_row_ranges.extend(
307            ranges
308                .into_iter()
309                .map(|range| (range.start + row_offset)..(range.end + row_offset)),
310        );
311        row_offset += batch_num_rows;
312    }
313
314    // Converts matched ranges to RowSelection.
315    let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum();
316    let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
317
318    let refined_selection = if rows_selected == 0 {
319        RowSelection::from(vec![])
320    } else {
321        // Build the prefilter selection relative to the yielded rows
322        // (not total_rows), since matched_row_ranges are offsets within
323        // the rows actually read from the stream.
324        let prefilter_selection =
325            row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter);
326
327        // Use and_then to apply prefilter selection within the context
328        // of the original selection, since prefilter offsets are relative
329        // to the original selection's selected rows.
330        match &build_ctx.row_selection {
331            Some(original) => original.and_then(&prefilter_selection),
332            None => prefilter_selection,
333        }
334    };
335
336    Ok(PrefilterResult {
337        refined_selection,
338        filtered_rows,
339    })
340}
341
342#[cfg(test)]
343mod tests {
344    use std::sync::Arc;
345    use std::sync::atomic::{AtomicUsize, Ordering};
346
347    use common_recordbatch::filter::SimpleFilterEvaluator;
348    use datafusion_expr::{col, lit};
349    use mito_codec::row_converter::PrimaryKeyFilter;
350    use store_api::codec::PrimaryKeyEncoding;
351
352    use super::*;
353    use crate::sst::parquet::format::ReadFormat;
354    use crate::test_util::sst_util::{new_primary_key, sst_region_metadata_with_encoding};
355
356    #[test]
357    fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() {
358        let metadata = Arc::new(sst_region_metadata_with_encoding(
359            PrimaryKeyEncoding::Sparse,
360        ));
361        let read_format = ReadFormat::new_flat(
362            metadata.clone(),
363            metadata.column_metadatas.iter().map(|c| c.column_id),
364            None,
365            "test",
366            true,
367        )
368        .unwrap();
369        assert!(read_format.as_flat().is_some());
370
371        let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap();
372        assert!(is_usable_primary_key_filter(&metadata, None, &filter));
373    }
374
375    #[test]
376    fn test_is_usable_primary_key_filter_supports_partition_column_by_default() {
377        let metadata = Arc::new(sst_region_metadata_with_encoding(
378            PrimaryKeyEncoding::Sparse,
379        ));
380        let filter = SimpleFilterEvaluator::try_new(
381            &col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)),
382        )
383        .unwrap();
384
385        assert!(is_usable_primary_key_filter(&metadata, None, &filter));
386    }
387
388    struct CountingPrimaryKeyFilter {
389        hits: Arc<AtomicUsize>,
390        expected: Vec<u8>,
391    }
392
393    impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
394        fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
395            self.hits.fetch_add(1, Ordering::Relaxed);
396            Ok(pk == self.expected.as_slice())
397        }
398    }
399
400    #[test]
401    fn test_cached_primary_key_filter_reuses_previous_result() {
402        let expected = new_primary_key(&["a", "x"]);
403        let hits = Arc::new(AtomicUsize::new(0));
404        let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
405            hits: Arc::clone(&hits),
406            expected: expected.clone(),
407        }));
408
409        assert!(filter.matches(expected.as_slice()).unwrap());
410        assert!(filter.matches(expected.as_slice()).unwrap());
411        assert!(
412            !filter
413                .matches(new_primary_key(&["b", "x"]).as_slice())
414                .unwrap()
415        );
416
417        assert_eq!(hits.load(Ordering::Relaxed), 2);
418    }
419}