mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::Schema;
31use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
32use parquet::arrow::arrow_reader::RowSelection;
33use parquet::file::metadata::ParquetMetaData;
34use snafu::{OptionExt, ResultExt};
35use store_api::codec::PrimaryKeyEncoding;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::{ColumnId, TimeSeriesRowSelector};
38use table::predicate::Predicate;
39
40use crate::error::{
41    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
42    EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
43    UnexpectedSnafu,
44};
45use crate::read::Batch;
46use crate::read::compat::CompatBatch;
47use crate::read::flat_projection::CompactionProjectionMapper;
48use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
49use crate::read::prune::{FlatPruneReader, PruneReader};
50use crate::sst::file::FileHandle;
51use crate::sst::parquet::flat_format::{
52    DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
53};
54use crate::sst::parquet::format::ReadFormat;
55use crate::sst::parquet::reader::{
56    FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
57};
58use crate::sst::parquet::row_group::ParquetFetchMetrics;
59use crate::sst::parquet::stats::RowGroupPruningStats;
60
61/// Checks if a row group contains delete operations by examining the min value of op_type column.
62///
63/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
64/// or an error if the statistics are not present or cannot be decoded.
65pub(crate) fn row_group_contains_delete(
66    parquet_meta: &ParquetMetaData,
67    row_group_index: usize,
68    file_path: &str,
69) -> Result<bool> {
70    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
71
72    // safety: The last column of SST must be op_type
73    let column_metadata = &row_group_metadata.columns().last().unwrap();
74    let stats = column_metadata
75        .statistics()
76        .context(StatsNotPresentSnafu { file_path })?;
77    stats
78        .min_bytes_opt()
79        .context(StatsNotPresentSnafu { file_path })?
80        .try_into()
81        .map(i32::from_le_bytes)
82        .map(|min_op_type| min_op_type == OpType::Delete as i32)
83        .ok()
84        .context(DecodeStatsSnafu { file_path })
85}
86
87/// A range of a parquet SST. Now it is a row group.
88/// We can read different file ranges in parallel.
89#[derive(Clone)]
90pub struct FileRange {
91    /// Shared context.
92    context: FileRangeContextRef,
93    /// Index of the row group in the SST.
94    row_group_idx: usize,
95    /// Row selection for the row group. `None` means all rows.
96    row_selection: Option<RowSelection>,
97}
98
99impl FileRange {
100    /// Creates a new [FileRange].
101    pub(crate) fn new(
102        context: FileRangeContextRef,
103        row_group_idx: usize,
104        row_selection: Option<RowSelection>,
105    ) -> Self {
106        Self {
107            context,
108            row_group_idx,
109            row_selection,
110        }
111    }
112
113    /// Returns true if [FileRange] selects all rows in row group.
114    fn select_all(&self) -> bool {
115        let rows_in_group = self
116            .context
117            .reader_builder
118            .parquet_metadata()
119            .row_group(self.row_group_idx)
120            .num_rows();
121
122        let Some(row_selection) = &self.row_selection else {
123            return true;
124        };
125        row_selection.row_count() == rows_in_group as usize
126    }
127
128    /// Performs pruning before reading the [FileRange].
129    /// It use latest dynamic filters with row group statistics to prune the range.
130    ///
131    /// Returns false if the entire range is pruned and can be skipped.
132    fn in_dynamic_filter_range(&self) -> bool {
133        if self.context.base.dyn_filters.is_empty() {
134            return true;
135        }
136        let curr_row_group = self
137            .context
138            .reader_builder
139            .parquet_metadata()
140            .row_group(self.row_group_idx);
141        let read_format = self.context.read_format();
142        let prune_schema = &self.context.base.prune_schema;
143        let stats = RowGroupPruningStats::new(
144            std::slice::from_ref(curr_row_group),
145            read_format,
146            self.context.base.expected_metadata.clone(),
147            self.compute_skip_fields(),
148        );
149
150        // not costly to create a predicate here since dynamic filters are wrapped in Arc
151        let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
152
153        pred.prune_with_stats(&stats, prune_schema.arrow_schema())
154            .first()
155            .cloned()
156            .unwrap_or(true) // unexpected, not skip just in case
157    }
158
159    fn compute_skip_fields(&self) -> bool {
160        match self.context.base.pre_filter_mode {
161            PreFilterMode::All => false,
162            PreFilterMode::SkipFields => true,
163            PreFilterMode::SkipFieldsOnDelete => {
164                // Check if this specific row group contains delete op
165                row_group_contains_delete(
166                    self.context.reader_builder.parquet_metadata(),
167                    self.row_group_idx,
168                    self.context.reader_builder.file_path(),
169                )
170                .unwrap_or(true)
171            }
172        }
173    }
174
175    /// Returns a reader to read the [FileRange].
176    pub(crate) async fn reader(
177        &self,
178        selector: Option<TimeSeriesRowSelector>,
179        fetch_metrics: Option<&ParquetFetchMetrics>,
180    ) -> Result<Option<PruneReader>> {
181        if !self.in_dynamic_filter_range() {
182            return Ok(None);
183        }
184        let parquet_reader = self
185            .context
186            .reader_builder
187            .build(
188                self.row_group_idx,
189                self.row_selection.clone(),
190                fetch_metrics,
191            )
192            .await?;
193
194        let use_last_row_reader = if selector
195            .map(|s| s == TimeSeriesRowSelector::LastRow)
196            .unwrap_or(false)
197        {
198            // Only use LastRowReader if row group does not contain DELETE
199            // and all rows are selected.
200            let put_only = !self
201                .context
202                .contains_delete(self.row_group_idx)
203                .inspect_err(|e| {
204                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
205                })
206                .unwrap_or(true);
207            put_only && self.select_all()
208        } else {
209            // No selector provided, use RowGroupReader
210            false
211        };
212
213        // Compute skip_fields once for this row group
214        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
215
216        let prune_reader = if use_last_row_reader {
217            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
218            let reader = RowGroupLastRowCachedReader::new(
219                self.file_handle().file_id().file_id(),
220                self.row_group_idx,
221                self.context.reader_builder.cache_strategy().clone(),
222                RowGroupReader::new(self.context.clone(), parquet_reader),
223            );
224            PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
225        } else {
226            // Row group contains DELETE, fallback to default reader.
227            PruneReader::new_with_row_group_reader(
228                self.context.clone(),
229                RowGroupReader::new(self.context.clone(), parquet_reader),
230                skip_fields,
231            )
232        };
233
234        Ok(Some(prune_reader))
235    }
236
237    /// Creates a flat reader that returns RecordBatch.
238    pub(crate) async fn flat_reader(
239        &self,
240        selector: Option<TimeSeriesRowSelector>,
241        fetch_metrics: Option<&ParquetFetchMetrics>,
242    ) -> Result<Option<FlatPruneReader>> {
243        if !self.in_dynamic_filter_range() {
244            return Ok(None);
245        }
246        let parquet_reader = self
247            .context
248            .reader_builder
249            .build(
250                self.row_group_idx,
251                self.row_selection.clone(),
252                fetch_metrics,
253            )
254            .await?;
255
256        let use_last_row_reader = if selector
257            .map(|s| s == TimeSeriesRowSelector::LastRow)
258            .unwrap_or(false)
259        {
260            // Only use LastRowReader if row group does not contain DELETE
261            // and all rows are selected.
262            let put_only = !self
263                .context
264                .contains_delete(self.row_group_idx)
265                .inspect_err(|e| {
266                    error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
267                })
268                .unwrap_or(true);
269            put_only && self.select_all()
270        } else {
271            false
272        };
273
274        // Compute skip_fields once for this row group
275        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
276
277        let flat_prune_reader = if use_last_row_reader {
278            let flat_row_group_reader =
279                FlatRowGroupReader::new(self.context.clone(), parquet_reader);
280            let reader = FlatRowGroupLastRowCachedReader::new(
281                self.file_handle().file_id().file_id(),
282                self.row_group_idx,
283                self.context.reader_builder.cache_strategy().clone(),
284                self.context.read_format().projection_indices(),
285                flat_row_group_reader,
286            );
287            FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
288        } else {
289            let flat_row_group_reader =
290                FlatRowGroupReader::new(self.context.clone(), parquet_reader);
291            FlatPruneReader::new_with_row_group_reader(
292                self.context.clone(),
293                flat_row_group_reader,
294                skip_fields,
295            )
296        };
297
298        Ok(Some(flat_prune_reader))
299    }
300
301    /// Returns the helper to compat batches.
302    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
303        self.context.compat_batch()
304    }
305
306    /// Returns the helper to project batches.
307    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
308        self.context.compaction_projection_mapper()
309    }
310
311    /// Returns the file handle of the file range.
312    pub(crate) fn file_handle(&self) -> &FileHandle {
313        self.context.reader_builder.file_handle()
314    }
315}
316
317/// Context shared by ranges of the same parquet SST.
318pub(crate) struct FileRangeContext {
319    /// Row group reader builder for the file.
320    reader_builder: RowGroupReaderBuilder,
321    /// Base of the context.
322    base: RangeBase,
323}
324
325pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
326
327impl FileRangeContext {
328    /// Creates a new [FileRangeContext].
329    pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
330        Self {
331            reader_builder,
332            base,
333        }
334    }
335
336    /// Returns the path of the file to read.
337    pub(crate) fn file_path(&self) -> &str {
338        self.reader_builder.file_path()
339    }
340
341    /// Returns filters pushed down.
342    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
343        &self.base.filters
344    }
345
346    /// Returns true if a partition filter is configured.
347    pub(crate) fn has_partition_filter(&self) -> bool {
348        self.base.partition_filter.is_some()
349    }
350
351    /// Returns the format helper.
352    pub(crate) fn read_format(&self) -> &ReadFormat {
353        &self.base.read_format
354    }
355
356    /// Returns the reader builder.
357    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
358        &self.reader_builder
359    }
360
361    /// Returns the helper to compat batches.
362    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
363        self.base.compat_batch.as_ref()
364    }
365
366    /// Returns the helper to project batches.
367    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
368        self.base.compaction_projection_mapper.as_ref()
369    }
370
371    /// Sets the `CompatBatch` to the context.
372    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
373        self.base.compat_batch = compat;
374    }
375
376    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
377    /// Return the filtered batch. If the entire batch is filtered out, return None.
378    /// If a partition expr filter is configured, it is also applied.
379    pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
380        self.base.precise_filter(input, skip_fields)
381    }
382
383    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
384    /// If a partition expr filter is configured, it is also applied.
385    pub(crate) fn precise_filter_flat(
386        &self,
387        input: RecordBatch,
388        skip_fields: bool,
389    ) -> Result<Option<RecordBatch>> {
390        self.base.precise_filter_flat(input, skip_fields)
391    }
392
393    /// Determines whether to skip field filters based on PreFilterMode and row group delete status.
394    pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
395        match self.base.pre_filter_mode {
396            PreFilterMode::All => false,
397            PreFilterMode::SkipFields => true,
398            PreFilterMode::SkipFieldsOnDelete => {
399                // Check if this specific row group contains delete op
400                self.contains_delete(row_group_idx).unwrap_or(true)
401            }
402        }
403    }
404
405    //// Decodes parquet metadata and finds if row group contains delete op.
406    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
407        let metadata = self.reader_builder.parquet_metadata();
408        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
409    }
410
411    /// Returns the estimated memory size of this context.
412    /// Mainly accounts for the parquet metadata size.
413    pub(crate) fn memory_size(&self) -> usize {
414        crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
415    }
416}
417
418/// Mode to pre-filter columns in a range.
419#[derive(Debug, Clone, Copy)]
420pub enum PreFilterMode {
421    /// Filters all columns.
422    All,
423    /// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
424    /// Otherwise, skips filtering fields.
425    SkipFieldsOnDelete,
426    /// Always skip fields.
427    SkipFields,
428}
429
430/// Context for partition expression filtering.
431pub(crate) struct PartitionFilterContext {
432    pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
433    /// Schema containing only columns referenced by the partition expression.
434    /// This is used to build a minimal RecordBatch for partition filter evaluation.
435    pub(crate) partition_schema: Arc<Schema>,
436}
437
438/// Common fields for a range to read and filter batches.
439pub(crate) struct RangeBase {
440    /// Filters pushed down.
441    pub(crate) filters: Vec<SimpleFilterContext>,
442    /// Dynamic filter physical exprs.
443    pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
444    /// Helper to read the SST.
445    pub(crate) read_format: ReadFormat,
446    pub(crate) expected_metadata: Option<RegionMetadataRef>,
447    /// Schema used for pruning with dynamic filters.
448    pub(crate) prune_schema: Arc<Schema>,
449    /// Decoder for primary keys
450    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
451    /// Optional helper to compat batches.
452    pub(crate) compat_batch: Option<CompatBatch>,
453    /// Optional helper to project batches.
454    pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
455    /// Mode to pre-filter columns.
456    pub(crate) pre_filter_mode: PreFilterMode,
457    /// Partition filter.
458    pub(crate) partition_filter: Option<PartitionFilterContext>,
459}
460
461pub(crate) struct TagDecodeState {
462    decoded_pks: Option<DecodedPrimaryKeys>,
463    decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
464}
465
466impl TagDecodeState {
467    pub(crate) fn new() -> Self {
468        Self {
469            decoded_pks: None,
470            decoded_tag_cache: HashMap::new(),
471        }
472    }
473}
474
475impl RangeBase {
476    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
477    /// Return the filtered batch. If the entire batch is filtered out, return None.
478    ///
479    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
480    ///
481    /// When a filter is referencing primary key column, this method will decode
482    /// the primary key and put it into the batch.
483    ///
484    /// # Arguments
485    /// * `input` - The batch to filter
486    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
487    pub(crate) fn precise_filter(
488        &self,
489        mut input: Batch,
490        skip_fields: bool,
491    ) -> Result<Option<Batch>> {
492        let mut mask = BooleanBuffer::new_set(input.num_rows());
493
494        // Run filter one by one and combine them result
495        // TODO(ruihang): run primary key filter first. It may short circuit other filters
496        for filter_ctx in &self.filters {
497            let filter = match filter_ctx.filter() {
498                MaybeFilter::Filter(f) => f,
499                // Column matches.
500                MaybeFilter::Matched => continue,
501                // Column doesn't match, filter the entire batch.
502                MaybeFilter::Pruned => return Ok(None),
503            };
504            let result = match filter_ctx.semantic_type() {
505                SemanticType::Tag => {
506                    let pk_values = if let Some(pk_values) = input.pk_values() {
507                        pk_values
508                    } else {
509                        input.set_pk_values(
510                            self.codec
511                                .decode(input.primary_key())
512                                .context(DecodeSnafu)?,
513                        );
514                        input.pk_values().unwrap()
515                    };
516                    let pk_value = match pk_values {
517                        CompositeValues::Dense(v) => {
518                            // Safety: this is a primary key
519                            let pk_index = self
520                                .read_format
521                                .metadata()
522                                .primary_key_index(filter_ctx.column_id())
523                                .unwrap();
524                            v[pk_index]
525                                .1
526                                .try_to_scalar_value(filter_ctx.data_type())
527                                .context(DataTypeMismatchSnafu)?
528                        }
529                        CompositeValues::Sparse(v) => {
530                            let v = v.get_or_null(filter_ctx.column_id());
531                            v.try_to_scalar_value(filter_ctx.data_type())
532                                .context(DataTypeMismatchSnafu)?
533                        }
534                    };
535                    if filter
536                        .evaluate_scalar(&pk_value)
537                        .context(RecordBatchSnafu)?
538                    {
539                        continue;
540                    } else {
541                        // PK not match means the entire batch is filtered out.
542                        return Ok(None);
543                    }
544                }
545                SemanticType::Field => {
546                    // Skip field filters if skip_fields is true
547                    if skip_fields {
548                        continue;
549                    }
550                    // Safety: Input is Batch so we are using primary key format.
551                    let Some(field_index) = self
552                        .read_format
553                        .as_primary_key()
554                        .unwrap()
555                        .field_index_by_id(filter_ctx.column_id())
556                    else {
557                        continue;
558                    };
559                    let field_col = &input.fields()[field_index].data;
560                    filter
561                        .evaluate_vector(field_col)
562                        .context(RecordBatchSnafu)?
563                }
564                SemanticType::Timestamp => filter
565                    .evaluate_vector(input.timestamps())
566                    .context(RecordBatchSnafu)?,
567            };
568
569            mask = mask.bitand(&result);
570        }
571
572        if mask.count_set_bits() == 0 {
573            return Ok(None);
574        }
575
576        // Apply partition filter
577        if let Some(partition_filter) = &self.partition_filter {
578            let record_batch = self
579                .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
580            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
581            mask = mask.bitand(&partition_mask);
582        }
583
584        if mask.count_set_bits() == 0 {
585            Ok(None)
586        } else {
587            input.filter(&BooleanArray::from(mask).into())?;
588            Ok(Some(input))
589        }
590    }
591
592    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
593    ///
594    /// It assumes all necessary tags are already decoded from the primary key.
595    ///
596    /// # Arguments
597    /// * `input` - The RecordBatch to filter
598    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
599    pub(crate) fn precise_filter_flat(
600        &self,
601        input: RecordBatch,
602        skip_fields: bool,
603    ) -> Result<Option<RecordBatch>> {
604        let mut tag_decode_state = TagDecodeState::new();
605        let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
606
607        // If mask is None, the entire batch is filtered out
608        let Some(mut mask) = mask else {
609            return Ok(None);
610        };
611
612        // Apply partition filter
613        if let Some(partition_filter) = &self.partition_filter {
614            let record_batch = self.project_record_batch_for_pruning_flat(
615                &input,
616                &partition_filter.partition_schema,
617                &mut tag_decode_state,
618            )?;
619            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
620            mask = mask.bitand(&partition_mask);
621        }
622
623        if mask.count_set_bits() == 0 {
624            return Ok(None);
625        }
626
627        let filtered_batch =
628            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
629                .context(ComputeArrowSnafu)?;
630
631        if filtered_batch.num_rows() > 0 {
632            Ok(Some(filtered_batch))
633        } else {
634            Ok(None)
635        }
636    }
637
638    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
639    /// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function.
640    ///
641    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
642    ///
643    /// # Arguments
644    /// * `input` - The RecordBatch to compute mask for
645    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
646    pub(crate) fn compute_filter_mask_flat(
647        &self,
648        input: &RecordBatch,
649        skip_fields: bool,
650        tag_decode_state: &mut TagDecodeState,
651    ) -> Result<Option<BooleanBuffer>> {
652        let mut mask = BooleanBuffer::new_set(input.num_rows());
653
654        let flat_format = self
655            .read_format
656            .as_flat()
657            .context(crate::error::UnexpectedSnafu {
658                reason: "Expected flat format for precise_filter_flat",
659            })?;
660        let metadata = flat_format.metadata();
661
662        // Run filter one by one and combine them result
663        for filter_ctx in &self.filters {
664            let filter = match filter_ctx.filter() {
665                MaybeFilter::Filter(f) => f,
666                // Column matches.
667                MaybeFilter::Matched => continue,
668                // Column doesn't match, filter the entire batch.
669                MaybeFilter::Pruned => return Ok(None),
670            };
671
672            // Skip field filters if skip_fields is true
673            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
674                continue;
675            }
676
677            // Get the column directly by its projected index.
678            // If the column is missing and it's not a tag/time column, this filter is skipped.
679            // Assumes the projection indices align with the input batch schema.
680            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
681            if let Some(idx) = column_idx {
682                let column = &input.columns().get(idx).unwrap();
683                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
684                mask = mask.bitand(&result);
685            } else if filter_ctx.semantic_type() == SemanticType::Tag {
686                // Column not found in projection, it may be a tag column.
687                let column_id = filter_ctx.column_id();
688
689                if let Some(tag_column) =
690                    self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
691                {
692                    let result = filter
693                        .evaluate_array(&tag_column)
694                        .context(RecordBatchSnafu)?;
695                    mask = mask.bitand(&result);
696                }
697            } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
698                let time_index_pos = time_index_column_index(input.num_columns());
699                let column = &input.columns()[time_index_pos];
700                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
701                mask = mask.bitand(&result);
702            }
703            // Non-tag column not found in projection.
704        }
705
706        Ok(Some(mask))
707    }
708
709    /// Returns the decoded tag column for `column_id`, or `None` if it's not a tag.
710    fn maybe_decode_tag_column(
711        &self,
712        metadata: &RegionMetadataRef,
713        column_id: ColumnId,
714        input: &RecordBatch,
715        tag_decode_state: &mut TagDecodeState,
716    ) -> Result<Option<ArrayRef>> {
717        let Some(pk_index) = metadata.primary_key_index(column_id) else {
718            return Ok(None);
719        };
720
721        if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
722            return Ok(Some(cached_column.clone()));
723        }
724
725        if tag_decode_state.decoded_pks.is_none() {
726            tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
727        }
728
729        let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
730            None
731        } else {
732            Some(pk_index)
733        };
734        let Some(column_index) = metadata.column_index_by_id(column_id) else {
735            return Ok(None);
736        };
737        let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
738            return Ok(None);
739        };
740
741        let column_metadata = &metadata.column_metadatas[column_index];
742        let tag_column = decoded.get_tag_column(
743            column_id,
744            pk_index,
745            &column_metadata.column_schema.data_type,
746        )?;
747        tag_decode_state
748            .decoded_tag_cache
749            .insert(column_id, tag_column.clone());
750
751        Ok(Some(tag_column))
752    }
753
754    /// Evaluates the partition filter against the input `RecordBatch`.
755    fn evaluate_partition_filter(
756        &self,
757        record_batch: &RecordBatch,
758        partition_filter: &PartitionFilterContext,
759    ) -> Result<BooleanBuffer> {
760        let columnar_value = partition_filter
761            .region_partition_physical_expr
762            .evaluate(record_batch)
763            .context(EvalPartitionFilterSnafu)?;
764        let array = columnar_value
765            .into_array(record_batch.num_rows())
766            .context(EvalPartitionFilterSnafu)?;
767        let boolean_array =
768            array
769                .as_any()
770                .downcast_ref::<BooleanArray>()
771                .context(UnexpectedSnafu {
772                    reason: "Failed to downcast to BooleanArray".to_string(),
773                })?;
774
775        // also need to consider nulls in the partition filter result. If a value is null, it should be treated as false (filtered out).
776        let mut mask = boolean_array.values().clone();
777        if let Some(nulls) = boolean_array.nulls() {
778            mask = mask.bitand(nulls.inner());
779        }
780
781        Ok(mask)
782    }
783
784    /// Builds a `RecordBatch` from the input `Batch` matching the given schema.
785    ///
786    /// This is used for partition expression evaluation. The schema should only contain
787    /// the columns referenced by the partition expression to minimize overhead.
788    fn build_record_batch_for_pruning(
789        &self,
790        input: &mut Batch,
791        schema: &Arc<Schema>,
792    ) -> Result<RecordBatch> {
793        let arrow_schema = schema.arrow_schema();
794        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
795
796        // Decode primary key if necessary.
797        if input.pk_values().is_none() {
798            input.set_pk_values(
799                self.codec
800                    .decode(input.primary_key())
801                    .context(DecodeSnafu)?,
802            );
803        }
804
805        for field in arrow_schema.fields() {
806            let metadata = self.read_format.metadata();
807            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
808
809            // Partition pruning schema should be a subset of the input batch schema.
810            let Some(column_id) = column_id else {
811                return UnexpectedSnafu {
812                    reason: format!(
813                        "Partition pruning schema expects column '{}' but it is missing in \
814                         region metadata",
815                        field.name()
816                    ),
817                }
818                .fail();
819            };
820
821            // 1. Check if it's a tag.
822            if let Some(pk_index) = metadata.primary_key_index(column_id) {
823                let pk_values = input.pk_values().unwrap();
824                let value = match pk_values {
825                    CompositeValues::Dense(v) => &v[pk_index].1,
826                    CompositeValues::Sparse(v) => v.get_or_null(column_id),
827                };
828                let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
829                let arrow_scalar = value
830                    .try_to_scalar_value(&concrete_type)
831                    .context(DataTypeMismatchSnafu)?;
832                let array = arrow_scalar
833                    .to_array_of_size(input.num_rows())
834                    .context(EvalPartitionFilterSnafu)?;
835                columns.push(array);
836            } else if metadata.time_index_column().column_id == column_id {
837                // 2. Check if it's the timestamp column.
838                columns.push(input.timestamps().to_arrow_array());
839            } else if let Some(field_index) = self
840                .read_format
841                .as_primary_key()
842                .and_then(|f| f.field_index_by_id(column_id))
843            {
844                // 3. Check if it's a field column.
845                columns.push(input.fields()[field_index].data.to_arrow_array());
846            } else {
847                return UnexpectedSnafu {
848                    reason: format!(
849                        "Partition pruning schema expects column '{}' (id {}) but it is not \
850                         present in input batch",
851                        field.name(),
852                        column_id
853                    ),
854                }
855                .fail();
856            }
857        }
858
859        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
860    }
861
862    /// Projects the input `RecordBatch` to match the given schema.
863    ///
864    /// This is used for partition expression evaluation. The schema should only contain
865    /// the columns referenced by the partition expression to minimize overhead.
866    fn project_record_batch_for_pruning_flat(
867        &self,
868        input: &RecordBatch,
869        schema: &Arc<Schema>,
870        tag_decode_state: &mut TagDecodeState,
871    ) -> Result<RecordBatch> {
872        let arrow_schema = schema.arrow_schema();
873        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
874
875        let flat_format = self
876            .read_format
877            .as_flat()
878            .context(crate::error::UnexpectedSnafu {
879                reason: "Expected flat format for precise_filter_flat",
880            })?;
881        let metadata = flat_format.metadata();
882
883        for field in arrow_schema.fields() {
884            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
885
886            let Some(column_id) = column_id else {
887                return UnexpectedSnafu {
888                    reason: format!(
889                        "Partition pruning schema expects column '{}' but it is missing in \
890                         region metadata",
891                        field.name()
892                    ),
893                }
894                .fail();
895            };
896
897            if let Some(idx) = flat_format.projected_index_by_id(column_id) {
898                columns.push(input.column(idx).clone());
899                continue;
900            }
901
902            if metadata.time_index_column().column_id == column_id {
903                let time_index_pos = time_index_column_index(input.num_columns());
904                columns.push(input.column(time_index_pos).clone());
905                continue;
906            }
907
908            if let Some(tag_column) =
909                self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
910            {
911                columns.push(tag_column);
912                continue;
913            }
914
915            return UnexpectedSnafu {
916                reason: format!(
917                    "Partition pruning schema expects column '{}' (id {}) but it is not \
918                     present in projected record batch",
919                    field.name(),
920                    column_id
921                ),
922            }
923            .fail();
924        }
925
926        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
927    }
928}