Skip to main content

mito2/sst/parquet/
prefilter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prefilter framework for parquet reader.
16//!
17//! Prefilter optimization reduces I/O by reading only a subset of columns first
18//! (the prefilter phase), applying filters to compute a refined row selection,
19//! then reading the remaining columns with the refined selection.
20
21use std::ops::Range;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use datatypes::arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder};
27use datatypes::arrow::record_batch::RecordBatch;
28use futures::StreamExt;
29use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
30use parquet::arrow::ProjectionMask;
31use parquet::arrow::arrow_reader::RowSelection;
32use parquet::schema::types::SchemaDescriptor;
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::{RegionMetadata, RegionMetadataRef};
35
36use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
37use crate::sst::parquet::flat_format::primary_key_column_index;
38use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
39use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
40use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact;
41
42pub(crate) fn matching_row_ranges_by_primary_key(
43    input: &RecordBatch,
44    pk_column_index: usize,
45    pk_filter: &mut dyn PrimaryKeyFilter,
46) -> Result<Vec<Range<usize>>> {
47    let pk_dict_array = input
48        .column(pk_column_index)
49        .as_any()
50        .downcast_ref::<PrimaryKeyArray>()
51        .context(UnexpectedSnafu {
52            reason: "Primary key column is not a dictionary array",
53        })?;
54    let pk_values = pk_dict_array
55        .values()
56        .as_any()
57        .downcast_ref::<BinaryArray>()
58        .context(UnexpectedSnafu {
59            reason: "Primary key values are not binary array",
60        })?;
61    let keys = pk_dict_array.keys();
62    let key_values = keys.values();
63
64    if key_values.is_empty() {
65        return Ok(std::iter::once(0..input.num_rows()).collect());
66    }
67
68    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
69    let mut start = 0;
70    while start < key_values.len() {
71        let key = key_values[start];
72        let mut end = start + 1;
73        while end < key_values.len() && key_values[end] == key {
74            end += 1;
75        }
76
77        if pk_filter
78            .matches(pk_values.value(key as usize))
79            .context(DecodeSnafu)?
80        {
81            if let Some(last) = matched_row_ranges.last_mut()
82                && last.end == start
83            {
84                last.end = end;
85            } else {
86                matched_row_ranges.push(start..end);
87            }
88        }
89
90        start = end;
91    }
92
93    Ok(matched_row_ranges)
94}
95
96/// Filters a flat-format record batch by primary key, returning only rows whose
97/// primary key matches the filter. Returns `None` if all rows are filtered out.
98pub(crate) fn prefilter_flat_batch_by_primary_key(
99    input: RecordBatch,
100    pk_column_index: usize,
101    pk_filter: &mut dyn PrimaryKeyFilter,
102) -> Result<Option<RecordBatch>> {
103    if input.num_rows() == 0 {
104        return Ok(Some(input));
105    }
106
107    let matched_row_ranges =
108        matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
109    if matched_row_ranges.is_empty() {
110        return Ok(None);
111    }
112
113    if matched_row_ranges.len() == 1
114        && matched_row_ranges[0].start == 0
115        && matched_row_ranges[0].end == input.num_rows()
116    {
117        return Ok(Some(input));
118    }
119
120    if matched_row_ranges.len() == 1 {
121        let span = &matched_row_ranges[0];
122        return Ok(Some(input.slice(span.start, span.end - span.start)));
123    }
124
125    let mut builder = BooleanBufferBuilder::new(input.num_rows());
126    builder.append_n(input.num_rows(), false);
127    for span in matched_row_ranges {
128        for i in span {
129            builder.set_bit(i, true);
130        }
131    }
132
133    let filtered = datatypes::arrow::compute::filter_record_batch(
134        &input,
135        &BooleanArray::new(builder.finish(), None),
136    )
137    .context(ComputeArrowSnafu)?;
138    if filtered.num_rows() == 0 {
139        Ok(None)
140    } else {
141        Ok(Some(filtered))
142    }
143}
144
145/// Returns whether a filter can be applied by parquet primary-key prefiltering.
146///
147/// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates
148/// on the partition column.
149pub(crate) fn is_usable_primary_key_filter(
150    sst_metadata: &RegionMetadataRef,
151    expected_metadata: Option<&RegionMetadata>,
152    filter: &SimpleFilterEvaluator,
153) -> bool {
154    let sst_column = match expected_metadata {
155        Some(expected_metadata) => {
156            let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
157            else {
158                return false;
159            };
160            let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else {
161                return false;
162            };
163
164            if sst_column.column_schema.name != expected_column.column_schema.name
165                || sst_column.semantic_type != expected_column.semantic_type
166                || sst_column.column_schema.data_type != expected_column.column_schema.data_type
167            {
168                return false;
169            }
170
171            sst_column
172        }
173        None => {
174            let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else {
175                return false;
176            };
177            sst_column
178        }
179    };
180
181    sst_column.semantic_type == SemanticType::Tag
182        && sst_metadata
183            .primary_key_index(sst_column.column_id)
184            .is_some()
185}
186
187pub(crate) struct CachedPrimaryKeyFilter {
188    inner: Box<dyn PrimaryKeyFilter>,
189    last_primary_key: Vec<u8>,
190    last_match: Option<bool>,
191}
192
193impl CachedPrimaryKeyFilter {
194    pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
195        Self {
196            inner,
197            last_primary_key: Vec::new(),
198            last_match: None,
199        }
200    }
201}
202
203impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
204    fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
205        if let Some(last_match) = self.last_match
206            && self.last_primary_key == pk
207        {
208            return Ok(last_match);
209        }
210
211        let matched = self.inner.matches(pk)?;
212        self.last_primary_key.clear();
213        self.last_primary_key.extend_from_slice(pk);
214        self.last_match = Some(matched);
215        Ok(matched)
216    }
217}
218
219/// Context for prefiltering a row group.
220///
221/// Currently supports primary key (PK) filtering only.
222/// Will be extended with simple column filters and physical filters in the future.
223pub(crate) struct PrefilterContext {
224    /// PK filter instance.
225    pk_filter: Box<dyn PrimaryKeyFilter>,
226    /// Projection mask for reading only the PK column.
227    pk_projection: ProjectionMask,
228    /// Index of the PK column within the prefilter projection batch.
229    /// This is 0 when we project only the PK column.
230    pk_column_index: usize,
231}
232
233/// Pre-built state for constructing [PrefilterContext] per row group.
234///
235/// Fields invariant across row groups (projection mask, codec, metadata, filters)
236/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
237/// is created via [PrefilterContextBuilder::build()] for each row group.
238pub(crate) struct PrefilterContextBuilder {
239    pk_projection: ProjectionMask,
240    pk_column_index: usize,
241    codec: Arc<dyn PrimaryKeyCodec>,
242    metadata: RegionMetadataRef,
243    pk_filters: Arc<Vec<SimpleFilterEvaluator>>,
244}
245
246impl PrefilterContextBuilder {
247    /// Creates a builder if prefiltering is applicable.
248    ///
249    /// Returns `None` if:
250    /// - No primary key filters are available
251    /// - The read format doesn't use flat layout with dictionary-encoded PKs
252    /// - The primary key is empty
253    pub(crate) fn new(
254        read_format: &ReadFormat,
255        codec: &Arc<dyn PrimaryKeyCodec>,
256        primary_key_filters: Option<&Arc<Vec<SimpleFilterEvaluator>>>,
257        parquet_schema: &SchemaDescriptor,
258    ) -> Option<Self> {
259        let pk_filters = primary_key_filters?;
260        if pk_filters.is_empty() {
261            return None;
262        }
263
264        let metadata = read_format.metadata();
265        if metadata.primary_key.is_empty() {
266            return None;
267        }
268
269        // Only perform PK prefiltering for primary-key-to-flat conversion path.
270        let flat_format = read_format.as_flat()?;
271        if flat_format.batch_has_raw_pk_columns() {
272            return None;
273        }
274
275        // Compute PK-only projection mask.
276        let num_parquet_columns = parquet_schema.num_columns();
277        let pk_index = primary_key_column_index(num_parquet_columns);
278        let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]);
279
280        // The PK column is the only column in the projection, so its index is 0.
281        let pk_column_index = 0;
282
283        Some(Self {
284            pk_projection,
285            pk_column_index,
286            codec: Arc::clone(codec),
287            metadata: metadata.clone(),
288            pk_filters: Arc::clone(pk_filters),
289        })
290    }
291
292    /// Builds a [PrefilterContext] for a specific row group.
293    pub(crate) fn build(&self) -> PrefilterContext {
294        // Parquet PK prefilter always supports the partition column. Only
295        // PartitionTreeMemtable skips it after partition pruning.
296        let pk_filter =
297            self.codec
298                .primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false);
299        let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter));
300        PrefilterContext {
301            pk_filter,
302            pk_projection: self.pk_projection.clone(),
303            pk_column_index: self.pk_column_index,
304        }
305    }
306}
307
308/// Result of prefiltering a row group.
309pub(crate) struct PrefilterResult {
310    /// Refined row selection after prefiltering.
311    pub(crate) refined_selection: RowSelection,
312    /// Number of rows filtered out by prefiltering.
313    pub(crate) filtered_rows: usize,
314}
315
316/// Executes prefiltering on a row group.
317///
318/// Reads only the prefilter columns (currently the PK dictionary column),
319/// applies filters, and returns a refined [RowSelection].
320pub(crate) async fn execute_prefilter(
321    prefilter_ctx: &mut PrefilterContext,
322    reader_builder: &RowGroupReaderBuilder,
323    build_ctx: &RowGroupBuildContext<'_>,
324) -> Result<PrefilterResult> {
325    // Reads PK column only.
326    let mut pk_stream = reader_builder
327        .build_with_projection(
328            build_ctx.row_group_idx,
329            build_ctx.row_selection.clone(),
330            prefilter_ctx.pk_projection.clone(),
331            build_ctx.fetch_metrics,
332        )
333        .await?;
334
335    // Applies PK filter to each batch and collect matching row ranges.
336    let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
337    let mut row_offset = 0;
338    let mut rows_before_filter = 0usize;
339
340    while let Some(batch_result) = pk_stream.next().await {
341        let batch = batch_result.context(ReadParquetSnafu {
342            path: reader_builder.file_path(),
343        })?;
344        let batch_num_rows = batch.num_rows();
345        if batch_num_rows == 0 {
346            continue;
347        }
348        rows_before_filter += batch_num_rows;
349
350        let ranges = matching_row_ranges_by_primary_key(
351            &batch,
352            prefilter_ctx.pk_column_index,
353            prefilter_ctx.pk_filter.as_mut(),
354        )?;
355        matched_row_ranges.extend(
356            ranges
357                .into_iter()
358                .map(|range| (range.start + row_offset)..(range.end + row_offset)),
359        );
360        row_offset += batch_num_rows;
361    }
362
363    // Converts matched ranges to RowSelection.
364    let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum();
365    let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
366
367    let refined_selection = if rows_selected == 0 {
368        RowSelection::from(vec![])
369    } else {
370        // Build the prefilter selection relative to the yielded rows
371        // (not total_rows), since matched_row_ranges are offsets within
372        // the rows actually read from the stream.
373        let prefilter_selection =
374            row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter);
375
376        // Use and_then to apply prefilter selection within the context
377        // of the original selection, since prefilter offsets are relative
378        // to the original selection's selected rows.
379        match &build_ctx.row_selection {
380            Some(original) => original.and_then(&prefilter_selection),
381            None => prefilter_selection,
382        }
383    };
384
385    Ok(PrefilterResult {
386        refined_selection,
387        filtered_rows,
388    })
389}
390
391#[cfg(test)]
392mod tests {
393    use std::sync::Arc;
394    use std::sync::atomic::{AtomicUsize, Ordering};
395
396    use common_recordbatch::filter::SimpleFilterEvaluator;
397    use datafusion_expr::{col, lit};
398    use datatypes::arrow::array::{
399        ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
400    };
401    use datatypes::arrow::datatypes::{Schema, UInt32Type};
402    use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
403    use store_api::codec::PrimaryKeyEncoding;
404
405    use super::*;
406    use crate::sst::internal_fields;
407    use crate::sst::parquet::format::ReadFormat;
408    use crate::test_util::sst_util::{
409        new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding,
410    };
411
412    #[test]
413    fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() {
414        let metadata = Arc::new(sst_region_metadata_with_encoding(
415            PrimaryKeyEncoding::Sparse,
416        ));
417        let read_format = ReadFormat::new_flat(
418            metadata.clone(),
419            metadata.column_metadatas.iter().map(|c| c.column_id),
420            None,
421            "test",
422            true,
423        )
424        .unwrap();
425        assert!(read_format.as_flat().is_some());
426
427        let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap();
428        assert!(is_usable_primary_key_filter(&metadata, None, &filter));
429    }
430
431    #[test]
432    fn test_is_usable_primary_key_filter_supports_partition_column_by_default() {
433        let metadata = Arc::new(sst_region_metadata_with_encoding(
434            PrimaryKeyEncoding::Sparse,
435        ));
436        let filter = SimpleFilterEvaluator::try_new(
437            &col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)),
438        )
439        .unwrap();
440
441        assert!(is_usable_primary_key_filter(&metadata, None, &filter));
442    }
443
444    struct CountingPrimaryKeyFilter {
445        hits: Arc<AtomicUsize>,
446        expected: Vec<u8>,
447    }
448
449    impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
450        fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
451            self.hits.fetch_add(1, Ordering::Relaxed);
452            Ok(pk == self.expected.as_slice())
453        }
454    }
455
456    #[test]
457    fn test_cached_primary_key_filter_reuses_previous_result() {
458        let expected = new_primary_key(&["a", "x"]);
459        let hits = Arc::new(AtomicUsize::new(0));
460        let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
461            hits: Arc::clone(&hits),
462            expected: expected.clone(),
463        }));
464
465        assert!(filter.matches(expected.as_slice()).unwrap());
466        assert!(filter.matches(expected.as_slice()).unwrap());
467        assert!(
468            !filter
469                .matches(new_primary_key(&["b", "x"]).as_slice())
470                .unwrap()
471        );
472
473        assert_eq!(hits.load(Ordering::Relaxed), 2);
474    }
475
476    fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
477        exprs
478            .iter()
479            .filter_map(SimpleFilterEvaluator::try_new)
480            .collect()
481    }
482
483    fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
484        assert_eq!(primary_keys.len(), field_values.len());
485
486        let metadata = Arc::new(sst_region_metadata());
487        let arrow_schema = metadata.schema.arrow_schema();
488        let field_column = arrow_schema
489            .field(arrow_schema.index_of("field_0").unwrap())
490            .clone();
491        let time_index_column = arrow_schema
492            .field(arrow_schema.index_of("ts").unwrap())
493            .clone();
494        let mut fields = vec![field_column, time_index_column];
495        fields.extend(
496            internal_fields()
497                .into_iter()
498                .map(|field| field.as_ref().clone()),
499        );
500        let schema = Arc::new(Schema::new(fields));
501
502        let mut dict_values = Vec::new();
503        let mut keys = Vec::with_capacity(primary_keys.len());
504        for pk in primary_keys {
505            let key = dict_values
506                .iter()
507                .position(|existing: &&[u8]| existing == pk)
508                .unwrap_or_else(|| {
509                    dict_values.push(*pk);
510                    dict_values.len() - 1
511                });
512            keys.push(key as u32);
513        }
514        let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
515            UInt32Array::from(keys),
516            Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
517        ));
518
519        RecordBatch::try_new(
520            schema,
521            vec![
522                Arc::new(UInt64Array::from(field_values.to_vec())),
523                Arc::new(TimestampMillisecondArray::from_iter_values(
524                    0..primary_keys.len() as i64,
525                )),
526                pk_array,
527                Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
528                Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
529            ],
530        )
531        .unwrap()
532    }
533
534    fn field_values(batch: &RecordBatch) -> Vec<u64> {
535        batch
536            .column(0)
537            .as_any()
538            .downcast_ref::<UInt64Array>()
539            .unwrap()
540            .values()
541            .to_vec()
542    }
543
544    #[test]
545    fn test_prefilter_primary_key_drops_single_dictionary_batch() {
546        let metadata = Arc::new(sst_region_metadata());
547        let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
548        let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
549            .primary_key_filter(&metadata, filters, false);
550        let pk_a = new_primary_key(&["a", "x"]);
551        let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
552        let pk_col_idx = primary_key_column_index(batch.num_columns());
553
554        let filtered =
555            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
556                .unwrap();
557
558        assert!(filtered.is_none());
559    }
560
561    #[test]
562    fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
563        let metadata = Arc::new(sst_region_metadata());
564        let filters = Arc::new(new_test_filters(&[col("tag_0")
565            .eq(lit("a"))
566            .or(col("tag_0").eq(lit("c")))]));
567        let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
568            .primary_key_filter(&metadata, filters, false);
569        let pk_a = new_primary_key(&["a", "x"]);
570        let pk_b = new_primary_key(&["b", "x"]);
571        let pk_c = new_primary_key(&["c", "x"]);
572        let pk_d = new_primary_key(&["d", "x"]);
573        let batch = new_raw_batch(
574            &[
575                pk_a.as_slice(),
576                pk_a.as_slice(),
577                pk_b.as_slice(),
578                pk_b.as_slice(),
579                pk_c.as_slice(),
580                pk_c.as_slice(),
581                pk_d.as_slice(),
582                pk_d.as_slice(),
583            ],
584            &[10, 11, 12, 13, 14, 15, 16, 17],
585        );
586        let pk_col_idx = primary_key_column_index(batch.num_columns());
587
588        let filtered =
589            prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
590                .unwrap()
591                .unwrap();
592
593        assert_eq!(filtered.num_rows(), 4);
594        assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
595    }
596}