Skip to main content

mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::Schema;
31use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
32use parquet::arrow::arrow_reader::RowSelection;
33use parquet::file::metadata::ParquetMetaData;
34use snafu::{OptionExt, ResultExt};
35use store_api::codec::PrimaryKeyEncoding;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::{ColumnId, TimeSeriesRowSelector};
38use table::predicate::Predicate;
39
40use crate::cache::CacheStrategy;
41use crate::error::{
42    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
43    EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
44    UnexpectedSnafu,
45};
46use crate::read::Batch;
47use crate::read::compat::CompatBatch;
48use crate::read::flat_projection::CompactionProjectionMapper;
49use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
50use crate::read::prune::{FlatPruneReader, PruneReader};
51use crate::sst::file::FileHandle;
52use crate::sst::parquet::flat_format::{
53    DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
54};
55use crate::sst::parquet::format::ReadFormat;
56use crate::sst::parquet::reader::{
57    FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReader, RowGroupReaderBuilder,
58    SimpleFilterContext,
59};
60use crate::sst::parquet::row_group::ParquetFetchMetrics;
61use crate::sst::parquet::stats::RowGroupPruningStats;
62
63/// Checks if a row group contains delete operations by examining the min value of op_type column.
64///
65/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
66/// or an error if the statistics are not present or cannot be decoded.
67pub(crate) fn row_group_contains_delete(
68    parquet_meta: &ParquetMetaData,
69    row_group_index: usize,
70    file_path: &str,
71) -> Result<bool> {
72    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
73
74    // safety: The last column of SST must be op_type
75    let column_metadata = &row_group_metadata.columns().last().unwrap();
76    let stats = column_metadata
77        .statistics()
78        .context(StatsNotPresentSnafu { file_path })?;
79    stats
80        .min_bytes_opt()
81        .context(StatsNotPresentSnafu { file_path })?
82        .try_into()
83        .map(i32::from_le_bytes)
84        .map(|min_op_type| min_op_type == OpType::Delete as i32)
85        .ok()
86        .context(DecodeStatsSnafu { file_path })
87}
88
89/// A range of a parquet SST. Now it is a row group.
90/// We can read different file ranges in parallel.
91#[derive(Clone)]
92pub struct FileRange {
93    /// Shared context.
94    context: FileRangeContextRef,
95    /// Index of the row group in the SST.
96    row_group_idx: usize,
97    /// Row selection for the row group. `None` means all rows.
98    row_selection: Option<RowSelection>,
99}
100
101impl FileRange {
102    /// Creates a new [FileRange].
103    pub(crate) fn new(
104        context: FileRangeContextRef,
105        row_group_idx: usize,
106        row_selection: Option<RowSelection>,
107    ) -> Self {
108        Self {
109            context,
110            row_group_idx,
111            row_selection,
112        }
113    }
114
115    /// Returns true if [FileRange] selects all rows in row group.
116    fn select_all(&self) -> bool {
117        let rows_in_group = self
118            .context
119            .reader_builder
120            .parquet_metadata()
121            .row_group(self.row_group_idx)
122            .num_rows();
123
124        let Some(row_selection) = &self.row_selection else {
125            return true;
126        };
127        row_selection.row_count() == rows_in_group as usize
128    }
129
130    /// Performs pruning before reading the [FileRange].
131    /// It use latest dynamic filters with row group statistics to prune the range.
132    ///
133    /// Returns false if the entire range is pruned and can be skipped.
134    fn in_dynamic_filter_range(&self) -> bool {
135        if self.context.base.dyn_filters.is_empty() {
136            return true;
137        }
138        let curr_row_group = self
139            .context
140            .reader_builder
141            .parquet_metadata()
142            .row_group(self.row_group_idx);
143        let read_format = self.context.read_format();
144        let prune_schema = &self.context.base.prune_schema;
145        let stats = RowGroupPruningStats::new(
146            std::slice::from_ref(curr_row_group),
147            read_format,
148            self.context.base.expected_metadata.clone(),
149            self.context.base.pre_filter_mode.skip_fields(),
150        );
151
152        // not costly to create a predicate here since dynamic filters are wrapped in Arc
153        let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
154
155        pred.prune_with_stats(&stats, prune_schema.arrow_schema())
156            .first()
157            .cloned()
158            .unwrap_or(true) // unexpected, not skip just in case
159    }
160
161    /// Returns a reader to read the [FileRange].
162    #[allow(dead_code)]
163    pub(crate) async fn reader(
164        &self,
165        selector: Option<TimeSeriesRowSelector>,
166        fetch_metrics: Option<&ParquetFetchMetrics>,
167    ) -> Result<Option<PruneReader>> {
168        if !self.in_dynamic_filter_range() {
169            return Ok(None);
170        }
171        // Compute skip_fields once for this row group
172        let skip_fields = self.context.base.pre_filter_mode.skip_fields();
173        let parquet_reader = self
174            .context
175            .reader_builder
176            .build(self.context.build_context(
177                self.row_group_idx,
178                self.row_selection.clone(),
179                fetch_metrics,
180                skip_fields,
181            ))
182            .await?;
183
184        let use_last_row_reader = if selector
185            .map(|s| s == TimeSeriesRowSelector::LastRow)
186            .unwrap_or(false)
187        {
188            // Only use LastRowReader if row group does not contain DELETE
189            // and all rows are selected.
190            let put_only = !self
191                .context
192                .contains_delete(self.row_group_idx)
193                .inspect_err(|e| {
194                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
195                })
196                .unwrap_or(true);
197            put_only && self.select_all()
198        } else {
199            // No selector provided, use RowGroupReader
200            false
201        };
202
203        let prune_reader = if use_last_row_reader {
204            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
205            let reader = RowGroupLastRowCachedReader::new(
206                self.file_handle().file_id().file_id(),
207                self.row_group_idx,
208                self.context.reader_builder.cache_strategy().clone(),
209                RowGroupReader::new(self.context.clone(), parquet_reader),
210            );
211            PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
212        } else {
213            // Row group contains DELETE, fallback to default reader.
214            PruneReader::new_with_row_group_reader(
215                self.context.clone(),
216                RowGroupReader::new(self.context.clone(), parquet_reader),
217                skip_fields,
218            )
219        };
220
221        Ok(Some(prune_reader))
222    }
223
224    /// Creates a flat reader that returns RecordBatch.
225    pub(crate) async fn flat_reader(
226        &self,
227        selector: Option<TimeSeriesRowSelector>,
228        fetch_metrics: Option<&ParquetFetchMetrics>,
229    ) -> Result<Option<FlatPruneReader>> {
230        if !self.in_dynamic_filter_range() {
231            return Ok(None);
232        }
233        // Compute skip_fields once for this row group
234        let skip_fields = self.context.base.pre_filter_mode.skip_fields();
235        let parquet_reader = self
236            .context
237            .reader_builder
238            .build(self.context.build_context(
239                self.row_group_idx,
240                self.row_selection.clone(),
241                fetch_metrics,
242                skip_fields,
243            ))
244            .await?;
245
246        let use_last_row_reader = if selector
247            .map(|s| s == TimeSeriesRowSelector::LastRow)
248            .unwrap_or(false)
249        {
250            // Only use LastRowReader if row group does not contain DELETE
251            // and all rows are selected.
252            let put_only = !self
253                .context
254                .contains_delete(self.row_group_idx)
255                .inspect_err(|e| {
256                    error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
257                })
258                .unwrap_or(true);
259            put_only && self.select_all()
260        } else {
261            false
262        };
263
264        let flat_prune_reader = if use_last_row_reader {
265            let flat_row_group_reader =
266                FlatRowGroupReader::new(self.context.clone(), parquet_reader);
267            // Flat PK prefilter makes the input stream predicate-dependent, so cached
268            // selector results are not reusable across queries with different filters.
269            let cache_strategy = if self.context.reader_builder.has_flat_primary_key_prefilter() {
270                CacheStrategy::Disabled
271            } else {
272                self.context.reader_builder.cache_strategy().clone()
273            };
274            let reader = FlatRowGroupLastRowCachedReader::new(
275                self.file_handle().file_id().file_id(),
276                self.row_group_idx,
277                cache_strategy,
278                self.context.read_format().projection_indices(),
279                flat_row_group_reader,
280            );
281            FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
282        } else {
283            let flat_row_group_reader =
284                FlatRowGroupReader::new(self.context.clone(), parquet_reader);
285            FlatPruneReader::new_with_row_group_reader(
286                self.context.clone(),
287                flat_row_group_reader,
288                skip_fields,
289            )
290        };
291
292        Ok(Some(flat_prune_reader))
293    }
294
295    /// Returns the helper to compat batches.
296    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
297        self.context.compat_batch()
298    }
299
300    /// Returns the helper to project batches.
301    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
302        self.context.compaction_projection_mapper()
303    }
304
305    /// Returns the file handle of the file range.
306    pub(crate) fn file_handle(&self) -> &FileHandle {
307        self.context.reader_builder.file_handle()
308    }
309}
310
311/// Context shared by ranges of the same parquet SST.
312pub(crate) struct FileRangeContext {
313    /// Row group reader builder for the file.
314    reader_builder: RowGroupReaderBuilder,
315    /// Base of the context.
316    base: RangeBase,
317}
318
319pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
320
321impl FileRangeContext {
322    /// Creates a new [FileRangeContext].
323    pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
324        Self {
325            reader_builder,
326            base,
327        }
328    }
329
330    /// Returns the path of the file to read.
331    pub(crate) fn file_path(&self) -> &str {
332        self.reader_builder.file_path()
333    }
334
335    /// Returns filters pushed down.
336    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
337        &self.base.filters
338    }
339
340    /// Returns true if a partition filter is configured.
341    pub(crate) fn has_partition_filter(&self) -> bool {
342        self.base.partition_filter.is_some()
343    }
344
345    /// Returns the format helper.
346    pub(crate) fn read_format(&self) -> &ReadFormat {
347        &self.base.read_format
348    }
349
350    /// Returns the reader builder.
351    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
352        &self.reader_builder
353    }
354
355    /// Returns the helper to compat batches.
356    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
357        self.base.compat_batch.as_ref()
358    }
359
360    /// Returns the helper to project batches.
361    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
362        self.base.compaction_projection_mapper.as_ref()
363    }
364
365    /// Sets the `CompatBatch` to the context.
366    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
367        self.base.compat_batch = compat;
368    }
369
370    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
371    /// Return the filtered batch. If the entire batch is filtered out, return None.
372    /// If a partition expr filter is configured, it is also applied.
373    pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
374        self.base.precise_filter(input, skip_fields)
375    }
376
377    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
378    /// If a partition expr filter is configured, it is also applied.
379    pub(crate) fn precise_filter_flat(
380        &self,
381        input: RecordBatch,
382        skip_fields: bool,
383    ) -> Result<Option<RecordBatch>> {
384        self.base.precise_filter_flat(
385            input,
386            skip_fields,
387            self.reader_builder.has_flat_primary_key_prefilter(),
388        )
389    }
390
391    pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
392        self.base.pre_filter_mode
393    }
394
395    //// Decodes parquet metadata and finds if row group contains delete op.
396    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
397        let metadata = self.reader_builder.parquet_metadata();
398        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
399    }
400
401    /// Creates a [RowGroupBuildContext] for building row group readers with prefiltering.
402    pub(crate) fn build_context<'a>(
403        &'a self,
404        row_group_idx: usize,
405        row_selection: Option<RowSelection>,
406        fetch_metrics: Option<&'a ParquetFetchMetrics>,
407        skip_fields: bool,
408    ) -> RowGroupBuildContext<'a> {
409        RowGroupBuildContext {
410            filters: &self.base.filters,
411            skip_fields,
412            row_group_idx,
413            row_selection,
414            fetch_metrics,
415        }
416    }
417
418    /// Returns the estimated memory size of this context.
419    /// Mainly accounts for the parquet metadata size.
420    pub(crate) fn memory_size(&self) -> usize {
421        crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
422    }
423}
424
425/// Mode to pre-filter columns in a range.
426#[derive(Debug, Clone, Copy, PartialEq)]
427pub enum PreFilterMode {
428    /// Filters all columns.
429    All,
430    /// Always skip fields.
431    SkipFields,
432}
433
434impl PreFilterMode {
435    pub(crate) fn skip_fields(self) -> bool {
436        matches!(self, Self::SkipFields)
437    }
438}
439
440/// Context for partition expression filtering.
441pub(crate) struct PartitionFilterContext {
442    pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
443    /// Schema containing only columns referenced by the partition expression.
444    /// This is used to build a minimal RecordBatch for partition filter evaluation.
445    pub(crate) partition_schema: Arc<Schema>,
446}
447
448/// Common fields for a range to read and filter batches.
449pub(crate) struct RangeBase {
450    /// Filters pushed down.
451    pub(crate) filters: Vec<SimpleFilterContext>,
452    /// Dynamic filter physical exprs.
453    pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
454    /// Helper to read the SST.
455    pub(crate) read_format: ReadFormat,
456    pub(crate) expected_metadata: Option<RegionMetadataRef>,
457    /// Schema used for pruning with dynamic filters.
458    pub(crate) prune_schema: Arc<Schema>,
459    /// Decoder for primary keys
460    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
461    /// Optional helper to compat batches.
462    pub(crate) compat_batch: Option<CompatBatch>,
463    /// Optional helper to project batches.
464    pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
465    /// Mode to pre-filter columns.
466    pub(crate) pre_filter_mode: PreFilterMode,
467    /// Partition filter.
468    pub(crate) partition_filter: Option<PartitionFilterContext>,
469}
470
471pub(crate) struct TagDecodeState {
472    decoded_pks: Option<DecodedPrimaryKeys>,
473    decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
474}
475
476impl TagDecodeState {
477    pub(crate) fn new() -> Self {
478        Self {
479            decoded_pks: None,
480            decoded_tag_cache: HashMap::new(),
481        }
482    }
483}
484
485impl RangeBase {
486    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
487    /// Return the filtered batch. If the entire batch is filtered out, return None.
488    ///
489    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
490    ///
491    /// When a filter is referencing primary key column, this method will decode
492    /// the primary key and put it into the batch.
493    ///
494    /// # Arguments
495    /// * `input` - The batch to filter
496    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode
497    pub(crate) fn precise_filter(
498        &self,
499        mut input: Batch,
500        skip_fields: bool,
501    ) -> Result<Option<Batch>> {
502        let mut mask = BooleanBuffer::new_set(input.num_rows());
503
504        // Run filter one by one and combine them result
505        // TODO(ruihang): run primary key filter first. It may short circuit other filters
506        for filter_ctx in &self.filters {
507            let filter = match filter_ctx.filter() {
508                MaybeFilter::Filter(f) => f,
509                // Column matches.
510                MaybeFilter::Matched => continue,
511                // Column doesn't match, filter the entire batch.
512                MaybeFilter::Pruned => return Ok(None),
513            };
514            let result = match filter_ctx.semantic_type() {
515                SemanticType::Tag => {
516                    let pk_values = if let Some(pk_values) = input.pk_values() {
517                        pk_values
518                    } else {
519                        input.set_pk_values(
520                            self.codec
521                                .decode(input.primary_key())
522                                .context(DecodeSnafu)?,
523                        );
524                        input.pk_values().unwrap()
525                    };
526                    let pk_value = match pk_values {
527                        CompositeValues::Dense(v) => {
528                            // Safety: this is a primary key
529                            let pk_index = self
530                                .read_format
531                                .metadata()
532                                .primary_key_index(filter_ctx.column_id())
533                                .unwrap();
534                            v[pk_index]
535                                .1
536                                .try_to_scalar_value(filter_ctx.data_type())
537                                .context(DataTypeMismatchSnafu)?
538                        }
539                        CompositeValues::Sparse(v) => {
540                            let v = v.get_or_null(filter_ctx.column_id());
541                            v.try_to_scalar_value(filter_ctx.data_type())
542                                .context(DataTypeMismatchSnafu)?
543                        }
544                    };
545                    if filter
546                        .evaluate_scalar(&pk_value)
547                        .context(RecordBatchSnafu)?
548                    {
549                        continue;
550                    } else {
551                        // PK not match means the entire batch is filtered out.
552                        return Ok(None);
553                    }
554                }
555                SemanticType::Field => {
556                    // Skip field filters if skip_fields is true
557                    if skip_fields {
558                        continue;
559                    }
560                    // Safety: Input is Batch so we are using primary key format.
561                    let Some(field_index) = self
562                        .read_format
563                        .as_primary_key()
564                        .unwrap()
565                        .field_index_by_id(filter_ctx.column_id())
566                    else {
567                        continue;
568                    };
569                    let field_col = &input.fields()[field_index].data;
570                    filter
571                        .evaluate_vector(field_col)
572                        .context(RecordBatchSnafu)?
573                }
574                SemanticType::Timestamp => filter
575                    .evaluate_vector(input.timestamps())
576                    .context(RecordBatchSnafu)?,
577            };
578
579            mask = mask.bitand(&result);
580        }
581
582        if mask.count_set_bits() == 0 {
583            return Ok(None);
584        }
585
586        // Apply partition filter
587        if let Some(partition_filter) = &self.partition_filter {
588            let record_batch = self
589                .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
590            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
591            mask = mask.bitand(&partition_mask);
592        }
593
594        if mask.count_set_bits() == 0 {
595            Ok(None)
596        } else {
597            input.filter(&BooleanArray::from(mask).into())?;
598            Ok(Some(input))
599        }
600    }
601
602    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
603    ///
604    /// It assumes all necessary tags are already decoded from the primary key.
605    ///
606    /// # Arguments
607    /// * `input` - The RecordBatch to filter
608    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode
609    pub(crate) fn precise_filter_flat(
610        &self,
611        input: RecordBatch,
612        skip_fields: bool,
613        skip_prefiltered_pk_filters: bool,
614    ) -> Result<Option<RecordBatch>> {
615        let mut tag_decode_state = TagDecodeState::new();
616        let mask = self.compute_filter_mask_flat(
617            &input,
618            skip_fields,
619            skip_prefiltered_pk_filters,
620            &mut tag_decode_state,
621        )?;
622
623        // If mask is None, the entire batch is filtered out
624        let Some(mut mask) = mask else {
625            return Ok(None);
626        };
627
628        // Apply partition filter
629        if let Some(partition_filter) = &self.partition_filter {
630            let record_batch = self.project_record_batch_for_pruning_flat(
631                &input,
632                &partition_filter.partition_schema,
633                &mut tag_decode_state,
634            )?;
635            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
636            mask = mask.bitand(&partition_mask);
637        }
638
639        if mask.count_set_bits() == 0 {
640            return Ok(None);
641        }
642
643        let filtered_batch =
644            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
645                .context(ComputeArrowSnafu)?;
646
647        if filtered_batch.num_rows() > 0 {
648            Ok(Some(filtered_batch))
649        } else {
650            Ok(None)
651        }
652    }
653
654    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
655    /// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function.
656    ///
657    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
658    ///
659    /// # Arguments
660    /// * `input` - The RecordBatch to compute mask for
661    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode
662    pub(crate) fn compute_filter_mask_flat(
663        &self,
664        input: &RecordBatch,
665        skip_fields: bool,
666        skip_prefiltered_pk_filters: bool,
667        tag_decode_state: &mut TagDecodeState,
668    ) -> Result<Option<BooleanBuffer>> {
669        let mut mask = BooleanBuffer::new_set(input.num_rows());
670
671        let flat_format = self
672            .read_format
673            .as_flat()
674            .context(crate::error::UnexpectedSnafu {
675                reason: "Expected flat format for precise_filter_flat",
676            })?;
677        let metadata = flat_format.metadata();
678
679        // Run filter one by one and combine them result
680        for filter_ctx in &self.filters {
681            let filter = match filter_ctx.filter() {
682                MaybeFilter::Filter(f) => f,
683                // Column matches.
684                MaybeFilter::Matched => continue,
685                // Column doesn't match, filter the entire batch.
686                MaybeFilter::Pruned => return Ok(None),
687            };
688
689            // Skip field filters if skip_fields is true
690            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
691                continue;
692            }
693
694            // Flat parquet PK prefiltering already applied these tag predicates while refining
695            // row selection, so skip them here to avoid decoding/evaluating the same condition twice.
696            if skip_prefiltered_pk_filters && filter_ctx.usable_primary_key_filter() {
697                continue;
698            }
699
700            // Get the column directly by its projected index.
701            // If the column is missing and it's not a tag/time column, this filter is skipped.
702            // Assumes the projection indices align with the input batch schema.
703            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
704            if let Some(idx) = column_idx {
705                let column = &input.columns().get(idx).unwrap();
706                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
707                mask = mask.bitand(&result);
708            } else if filter_ctx.semantic_type() == SemanticType::Tag {
709                // Column not found in projection, it may be a tag column.
710                let column_id = filter_ctx.column_id();
711
712                if let Some(tag_column) =
713                    self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
714                {
715                    let result = filter
716                        .evaluate_array(&tag_column)
717                        .context(RecordBatchSnafu)?;
718                    mask = mask.bitand(&result);
719                }
720            } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
721                let time_index_pos = time_index_column_index(input.num_columns());
722                let column = &input.columns()[time_index_pos];
723                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
724                mask = mask.bitand(&result);
725            }
726            // Non-tag column not found in projection.
727        }
728
729        Ok(Some(mask))
730    }
731
732    /// Returns the decoded tag column for `column_id`, or `None` if it's not a tag.
733    fn maybe_decode_tag_column(
734        &self,
735        metadata: &RegionMetadataRef,
736        column_id: ColumnId,
737        input: &RecordBatch,
738        tag_decode_state: &mut TagDecodeState,
739    ) -> Result<Option<ArrayRef>> {
740        let Some(pk_index) = metadata.primary_key_index(column_id) else {
741            return Ok(None);
742        };
743
744        if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
745            return Ok(Some(cached_column.clone()));
746        }
747
748        if tag_decode_state.decoded_pks.is_none() {
749            tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
750        }
751
752        let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
753            None
754        } else {
755            Some(pk_index)
756        };
757        let Some(column_index) = metadata.column_index_by_id(column_id) else {
758            return Ok(None);
759        };
760        let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
761            return Ok(None);
762        };
763
764        let column_metadata = &metadata.column_metadatas[column_index];
765        let tag_column = decoded.get_tag_column(
766            column_id,
767            pk_index,
768            &column_metadata.column_schema.data_type,
769        )?;
770        tag_decode_state
771            .decoded_tag_cache
772            .insert(column_id, tag_column.clone());
773
774        Ok(Some(tag_column))
775    }
776
777    /// Evaluates the partition filter against the input `RecordBatch`.
778    fn evaluate_partition_filter(
779        &self,
780        record_batch: &RecordBatch,
781        partition_filter: &PartitionFilterContext,
782    ) -> Result<BooleanBuffer> {
783        let columnar_value = partition_filter
784            .region_partition_physical_expr
785            .evaluate(record_batch)
786            .context(EvalPartitionFilterSnafu)?;
787        let array = columnar_value
788            .into_array(record_batch.num_rows())
789            .context(EvalPartitionFilterSnafu)?;
790        let boolean_array =
791            array
792                .as_any()
793                .downcast_ref::<BooleanArray>()
794                .context(UnexpectedSnafu {
795                    reason: "Failed to downcast to BooleanArray".to_string(),
796                })?;
797
798        // also need to consider nulls in the partition filter result. If a value is null, it should be treated as false (filtered out).
799        let mut mask = boolean_array.values().clone();
800        if let Some(nulls) = boolean_array.nulls() {
801            mask = mask.bitand(nulls.inner());
802        }
803
804        Ok(mask)
805    }
806
807    /// Builds a `RecordBatch` from the input `Batch` matching the given schema.
808    ///
809    /// This is used for partition expression evaluation. The schema should only contain
810    /// the columns referenced by the partition expression to minimize overhead.
811    fn build_record_batch_for_pruning(
812        &self,
813        input: &mut Batch,
814        schema: &Arc<Schema>,
815    ) -> Result<RecordBatch> {
816        let arrow_schema = schema.arrow_schema();
817        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
818
819        // Decode primary key if necessary.
820        if input.pk_values().is_none() {
821            input.set_pk_values(
822                self.codec
823                    .decode(input.primary_key())
824                    .context(DecodeSnafu)?,
825            );
826        }
827
828        for field in arrow_schema.fields() {
829            let metadata = self.read_format.metadata();
830            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
831
832            // Partition pruning schema should be a subset of the input batch schema.
833            let Some(column_id) = column_id else {
834                return UnexpectedSnafu {
835                    reason: format!(
836                        "Partition pruning schema expects column '{}' but it is missing in \
837                         region metadata",
838                        field.name()
839                    ),
840                }
841                .fail();
842            };
843
844            // 1. Check if it's a tag.
845            if let Some(pk_index) = metadata.primary_key_index(column_id) {
846                let pk_values = input.pk_values().unwrap();
847                let value = match pk_values {
848                    CompositeValues::Dense(v) => &v[pk_index].1,
849                    CompositeValues::Sparse(v) => v.get_or_null(column_id),
850                };
851                let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
852                let arrow_scalar = value
853                    .try_to_scalar_value(&concrete_type)
854                    .context(DataTypeMismatchSnafu)?;
855                let array = arrow_scalar
856                    .to_array_of_size(input.num_rows())
857                    .context(EvalPartitionFilterSnafu)?;
858                columns.push(array);
859            } else if metadata.time_index_column().column_id == column_id {
860                // 2. Check if it's the timestamp column.
861                columns.push(input.timestamps().to_arrow_array());
862            } else if let Some(field_index) = self
863                .read_format
864                .as_primary_key()
865                .and_then(|f| f.field_index_by_id(column_id))
866            {
867                // 3. Check if it's a field column.
868                columns.push(input.fields()[field_index].data.to_arrow_array());
869            } else {
870                return UnexpectedSnafu {
871                    reason: format!(
872                        "Partition pruning schema expects column '{}' (id {}) but it is not \
873                         present in input batch",
874                        field.name(),
875                        column_id
876                    ),
877                }
878                .fail();
879            }
880        }
881
882        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
883    }
884
885    /// Projects the input `RecordBatch` to match the given schema.
886    ///
887    /// This is used for partition expression evaluation. The schema should only contain
888    /// the columns referenced by the partition expression to minimize overhead.
889    fn project_record_batch_for_pruning_flat(
890        &self,
891        input: &RecordBatch,
892        schema: &Arc<Schema>,
893        tag_decode_state: &mut TagDecodeState,
894    ) -> Result<RecordBatch> {
895        let arrow_schema = schema.arrow_schema();
896        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
897
898        let flat_format = self
899            .read_format
900            .as_flat()
901            .context(crate::error::UnexpectedSnafu {
902                reason: "Expected flat format for precise_filter_flat",
903            })?;
904        let metadata = flat_format.metadata();
905
906        for field in arrow_schema.fields() {
907            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
908
909            let Some(column_id) = column_id else {
910                return UnexpectedSnafu {
911                    reason: format!(
912                        "Partition pruning schema expects column '{}' but it is missing in \
913                         region metadata",
914                        field.name()
915                    ),
916                }
917                .fail();
918            };
919
920            if let Some(idx) = flat_format.projected_index_by_id(column_id) {
921                columns.push(input.column(idx).clone());
922                continue;
923            }
924
925            if metadata.time_index_column().column_id == column_id {
926                let time_index_pos = time_index_column_index(input.num_columns());
927                columns.push(input.column(time_index_pos).clone());
928                continue;
929            }
930
931            if let Some(tag_column) =
932                self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
933            {
934                columns.push(tag_column);
935                continue;
936            }
937
938            return UnexpectedSnafu {
939                reason: format!(
940                    "Partition pruning schema expects column '{}' (id {}) but it is not \
941                     present in projected record batch",
942                    field.name(),
943                    column_id
944                ),
945            }
946            .fail();
947        }
948
949        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use std::sync::Arc;
956
957    use datafusion_expr::{col, lit};
958
959    use super::*;
960    use crate::sst::parquet::format::ReadFormat;
961    use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
962
963    fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
964        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
965        let read_format = ReadFormat::new_flat(
966            metadata.clone(),
967            metadata.column_metadatas.iter().map(|c| c.column_id),
968            None,
969            "test",
970            true,
971        )
972        .unwrap();
973
974        RangeBase {
975            filters,
976            dyn_filters: vec![],
977            read_format,
978            expected_metadata: None,
979            prune_schema: metadata.schema.clone(),
980            codec: mito_codec::row_converter::build_primary_key_codec(metadata.as_ref()),
981            compat_batch: None,
982            compaction_projection_mapper: None,
983            pre_filter_mode: PreFilterMode::All,
984            partition_filter: None,
985        }
986    }
987
988    #[test]
989    fn test_compute_filter_mask_flat_skips_prefiltered_pk_filters() {
990        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
991        let filters = vec![
992            SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(),
993            SimpleFilterContext::new_opt(&metadata, None, &col("field_0").gt(lit(1_u64))).unwrap(),
994        ];
995        let base = new_test_range_base(filters);
996        let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
997
998        let mask_without_skip = base
999            .compute_filter_mask_flat(&batch, false, false, &mut TagDecodeState::new())
1000            .unwrap()
1001            .unwrap();
1002        assert_eq!(mask_without_skip.count_set_bits(), 0);
1003
1004        let mask_with_skip = base
1005            .compute_filter_mask_flat(&batch, false, true, &mut TagDecodeState::new())
1006            .unwrap()
1007            .unwrap();
1008        assert_eq!(mask_with_skip.count_set_bits(), 2);
1009    }
1010}