mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::Schema;
31use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
32use parquet::arrow::arrow_reader::RowSelection;
33use parquet::file::metadata::ParquetMetaData;
34use snafu::{OptionExt, ResultExt};
35use store_api::codec::PrimaryKeyEncoding;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::{ColumnId, TimeSeriesRowSelector};
38use table::predicate::Predicate;
39
40use crate::error::{
41    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
42    EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
43    UnexpectedSnafu,
44};
45use crate::read::Batch;
46use crate::read::compat::CompatBatch;
47use crate::read::flat_projection::CompactionProjectionMapper;
48use crate::read::last_row::RowGroupLastRowCachedReader;
49use crate::read::prune::{FlatPruneReader, PruneReader};
50use crate::sst::file::FileHandle;
51use crate::sst::parquet::flat_format::{
52    DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
53};
54use crate::sst::parquet::format::ReadFormat;
55use crate::sst::parquet::reader::{
56    FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
57};
58use crate::sst::parquet::row_group::ParquetFetchMetrics;
59use crate::sst::parquet::stats::RowGroupPruningStats;
60
61/// Checks if a row group contains delete operations by examining the min value of op_type column.
62///
63/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
64/// or an error if the statistics are not present or cannot be decoded.
65pub(crate) fn row_group_contains_delete(
66    parquet_meta: &ParquetMetaData,
67    row_group_index: usize,
68    file_path: &str,
69) -> Result<bool> {
70    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
71
72    // safety: The last column of SST must be op_type
73    let column_metadata = &row_group_metadata.columns().last().unwrap();
74    let stats = column_metadata
75        .statistics()
76        .context(StatsNotPresentSnafu { file_path })?;
77    stats
78        .min_bytes_opt()
79        .context(StatsNotPresentSnafu { file_path })?
80        .try_into()
81        .map(i32::from_le_bytes)
82        .map(|min_op_type| min_op_type == OpType::Delete as i32)
83        .ok()
84        .context(DecodeStatsSnafu { file_path })
85}
86
87/// A range of a parquet SST. Now it is a row group.
88/// We can read different file ranges in parallel.
89#[derive(Clone)]
90pub struct FileRange {
91    /// Shared context.
92    context: FileRangeContextRef,
93    /// Index of the row group in the SST.
94    row_group_idx: usize,
95    /// Row selection for the row group. `None` means all rows.
96    row_selection: Option<RowSelection>,
97}
98
99impl FileRange {
100    /// Creates a new [FileRange].
101    pub(crate) fn new(
102        context: FileRangeContextRef,
103        row_group_idx: usize,
104        row_selection: Option<RowSelection>,
105    ) -> Self {
106        Self {
107            context,
108            row_group_idx,
109            row_selection,
110        }
111    }
112
113    /// Returns true if [FileRange] selects all rows in row group.
114    fn select_all(&self) -> bool {
115        let rows_in_group = self
116            .context
117            .reader_builder
118            .parquet_metadata()
119            .row_group(self.row_group_idx)
120            .num_rows();
121
122        let Some(row_selection) = &self.row_selection else {
123            return true;
124        };
125        row_selection.row_count() == rows_in_group as usize
126    }
127
128    /// Performs pruning before reading the [FileRange].
129    /// It use latest dynamic filters with row group statistics to prune the range.
130    ///
131    /// Returns false if the entire range is pruned and can be skipped.
132    fn in_dynamic_filter_range(&self) -> bool {
133        if self.context.base.dyn_filters.is_empty() {
134            return true;
135        }
136        let curr_row_group = self
137            .context
138            .reader_builder
139            .parquet_metadata()
140            .row_group(self.row_group_idx);
141        let read_format = self.context.read_format();
142        let prune_schema = &self.context.base.prune_schema;
143        let stats = RowGroupPruningStats::new(
144            std::slice::from_ref(curr_row_group),
145            read_format,
146            self.context.base.expected_metadata.clone(),
147            self.compute_skip_fields(),
148        );
149
150        // not costly to create a predicate here since dynamic filters are wrapped in Arc
151        let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone());
152
153        pred.prune_with_stats(&stats, prune_schema.arrow_schema())
154            .first()
155            .cloned()
156            .unwrap_or(true) // unexpected, not skip just in case
157    }
158
159    fn compute_skip_fields(&self) -> bool {
160        match self.context.base.pre_filter_mode {
161            PreFilterMode::All => false,
162            PreFilterMode::SkipFields => true,
163            PreFilterMode::SkipFieldsOnDelete => {
164                // Check if this specific row group contains delete op
165                row_group_contains_delete(
166                    self.context.reader_builder.parquet_metadata(),
167                    self.row_group_idx,
168                    self.context.reader_builder.file_path(),
169                )
170                .unwrap_or(true)
171            }
172        }
173    }
174
175    /// Returns a reader to read the [FileRange].
176    pub(crate) async fn reader(
177        &self,
178        selector: Option<TimeSeriesRowSelector>,
179        fetch_metrics: Option<&ParquetFetchMetrics>,
180    ) -> Result<Option<PruneReader>> {
181        if !self.in_dynamic_filter_range() {
182            return Ok(None);
183        }
184        let parquet_reader = self
185            .context
186            .reader_builder
187            .build(
188                self.row_group_idx,
189                self.row_selection.clone(),
190                fetch_metrics,
191            )
192            .await?;
193
194        let use_last_row_reader = if selector
195            .map(|s| s == TimeSeriesRowSelector::LastRow)
196            .unwrap_or(false)
197        {
198            // Only use LastRowReader if row group does not contain DELETE
199            // and all rows are selected.
200            let put_only = !self
201                .context
202                .contains_delete(self.row_group_idx)
203                .inspect_err(|e| {
204                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
205                })
206                .unwrap_or(true);
207            put_only && self.select_all()
208        } else {
209            // No selector provided, use RowGroupReader
210            false
211        };
212
213        // Compute skip_fields once for this row group
214        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
215
216        let prune_reader = if use_last_row_reader {
217            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
218            let reader = RowGroupLastRowCachedReader::new(
219                self.file_handle().file_id().file_id(),
220                self.row_group_idx,
221                self.context.reader_builder.cache_strategy().clone(),
222                RowGroupReader::new(self.context.clone(), parquet_reader),
223            );
224            PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
225        } else {
226            // Row group contains DELETE, fallback to default reader.
227            PruneReader::new_with_row_group_reader(
228                self.context.clone(),
229                RowGroupReader::new(self.context.clone(), parquet_reader),
230                skip_fields,
231            )
232        };
233
234        Ok(Some(prune_reader))
235    }
236
237    /// Creates a flat reader that returns RecordBatch.
238    pub(crate) async fn flat_reader(
239        &self,
240        fetch_metrics: Option<&ParquetFetchMetrics>,
241    ) -> Result<Option<FlatPruneReader>> {
242        if !self.in_dynamic_filter_range() {
243            return Ok(None);
244        }
245        let parquet_reader = self
246            .context
247            .reader_builder
248            .build(
249                self.row_group_idx,
250                self.row_selection.clone(),
251                fetch_metrics,
252            )
253            .await?;
254
255        // Compute skip_fields once for this row group
256        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
257
258        let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
259        let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
260            self.context.clone(),
261            flat_row_group_reader,
262            skip_fields,
263        );
264
265        Ok(Some(flat_prune_reader))
266    }
267
268    /// Returns the helper to compat batches.
269    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
270        self.context.compat_batch()
271    }
272
273    /// Returns the helper to project batches.
274    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
275        self.context.compaction_projection_mapper()
276    }
277
278    /// Returns the file handle of the file range.
279    pub(crate) fn file_handle(&self) -> &FileHandle {
280        self.context.reader_builder.file_handle()
281    }
282}
283
284/// Context shared by ranges of the same parquet SST.
285pub(crate) struct FileRangeContext {
286    /// Row group reader builder for the file.
287    reader_builder: RowGroupReaderBuilder,
288    /// Base of the context.
289    base: RangeBase,
290}
291
292pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
293
294impl FileRangeContext {
295    /// Creates a new [FileRangeContext].
296    pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
297        Self {
298            reader_builder,
299            base,
300        }
301    }
302
303    /// Returns the path of the file to read.
304    pub(crate) fn file_path(&self) -> &str {
305        self.reader_builder.file_path()
306    }
307
308    /// Returns filters pushed down.
309    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
310        &self.base.filters
311    }
312
313    /// Returns true if a partition filter is configured.
314    pub(crate) fn has_partition_filter(&self) -> bool {
315        self.base.partition_filter.is_some()
316    }
317
318    /// Returns the format helper.
319    pub(crate) fn read_format(&self) -> &ReadFormat {
320        &self.base.read_format
321    }
322
323    /// Returns the reader builder.
324    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
325        &self.reader_builder
326    }
327
328    /// Returns the helper to compat batches.
329    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
330        self.base.compat_batch.as_ref()
331    }
332
333    /// Returns the helper to project batches.
334    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
335        self.base.compaction_projection_mapper.as_ref()
336    }
337
338    /// Sets the `CompatBatch` to the context.
339    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
340        self.base.compat_batch = compat;
341    }
342
343    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
344    /// Return the filtered batch. If the entire batch is filtered out, return None.
345    /// If a partition expr filter is configured, it is also applied.
346    pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
347        self.base.precise_filter(input, skip_fields)
348    }
349
350    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
351    /// If a partition expr filter is configured, it is also applied.
352    pub(crate) fn precise_filter_flat(
353        &self,
354        input: RecordBatch,
355        skip_fields: bool,
356    ) -> Result<Option<RecordBatch>> {
357        self.base.precise_filter_flat(input, skip_fields)
358    }
359
360    /// Determines whether to skip field filters based on PreFilterMode and row group delete status.
361    pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
362        match self.base.pre_filter_mode {
363            PreFilterMode::All => false,
364            PreFilterMode::SkipFields => true,
365            PreFilterMode::SkipFieldsOnDelete => {
366                // Check if this specific row group contains delete op
367                self.contains_delete(row_group_idx).unwrap_or(true)
368            }
369        }
370    }
371
372    //// Decodes parquet metadata and finds if row group contains delete op.
373    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
374        let metadata = self.reader_builder.parquet_metadata();
375        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
376    }
377
378    /// Returns the estimated memory size of this context.
379    /// Mainly accounts for the parquet metadata size.
380    pub(crate) fn memory_size(&self) -> usize {
381        crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
382    }
383}
384
385/// Mode to pre-filter columns in a range.
386#[derive(Debug, Clone, Copy)]
387pub enum PreFilterMode {
388    /// Filters all columns.
389    All,
390    /// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
391    /// Otherwise, skips filtering fields.
392    SkipFieldsOnDelete,
393    /// Always skip fields.
394    SkipFields,
395}
396
397/// Context for partition expression filtering.
398pub(crate) struct PartitionFilterContext {
399    pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
400    /// Schema containing only columns referenced by the partition expression.
401    /// This is used to build a minimal RecordBatch for partition filter evaluation.
402    pub(crate) partition_schema: Arc<Schema>,
403}
404
405/// Common fields for a range to read and filter batches.
406pub(crate) struct RangeBase {
407    /// Filters pushed down.
408    pub(crate) filters: Vec<SimpleFilterContext>,
409    /// Dynamic filter physical exprs.
410    pub(crate) dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
411    /// Helper to read the SST.
412    pub(crate) read_format: ReadFormat,
413    pub(crate) expected_metadata: Option<RegionMetadataRef>,
414    /// Schema used for pruning with dynamic filters.
415    pub(crate) prune_schema: Arc<Schema>,
416    /// Decoder for primary keys
417    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
418    /// Optional helper to compat batches.
419    pub(crate) compat_batch: Option<CompatBatch>,
420    /// Optional helper to project batches.
421    pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
422    /// Mode to pre-filter columns.
423    pub(crate) pre_filter_mode: PreFilterMode,
424    /// Partition filter.
425    pub(crate) partition_filter: Option<PartitionFilterContext>,
426}
427
428pub(crate) struct TagDecodeState {
429    decoded_pks: Option<DecodedPrimaryKeys>,
430    decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
431}
432
433impl TagDecodeState {
434    pub(crate) fn new() -> Self {
435        Self {
436            decoded_pks: None,
437            decoded_tag_cache: HashMap::new(),
438        }
439    }
440}
441
442impl RangeBase {
443    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
444    /// Return the filtered batch. If the entire batch is filtered out, return None.
445    ///
446    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
447    ///
448    /// When a filter is referencing primary key column, this method will decode
449    /// the primary key and put it into the batch.
450    ///
451    /// # Arguments
452    /// * `input` - The batch to filter
453    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
454    pub(crate) fn precise_filter(
455        &self,
456        mut input: Batch,
457        skip_fields: bool,
458    ) -> Result<Option<Batch>> {
459        let mut mask = BooleanBuffer::new_set(input.num_rows());
460
461        // Run filter one by one and combine them result
462        // TODO(ruihang): run primary key filter first. It may short circuit other filters
463        for filter_ctx in &self.filters {
464            let filter = match filter_ctx.filter() {
465                MaybeFilter::Filter(f) => f,
466                // Column matches.
467                MaybeFilter::Matched => continue,
468                // Column doesn't match, filter the entire batch.
469                MaybeFilter::Pruned => return Ok(None),
470            };
471            let result = match filter_ctx.semantic_type() {
472                SemanticType::Tag => {
473                    let pk_values = if let Some(pk_values) = input.pk_values() {
474                        pk_values
475                    } else {
476                        input.set_pk_values(
477                            self.codec
478                                .decode(input.primary_key())
479                                .context(DecodeSnafu)?,
480                        );
481                        input.pk_values().unwrap()
482                    };
483                    let pk_value = match pk_values {
484                        CompositeValues::Dense(v) => {
485                            // Safety: this is a primary key
486                            let pk_index = self
487                                .read_format
488                                .metadata()
489                                .primary_key_index(filter_ctx.column_id())
490                                .unwrap();
491                            v[pk_index]
492                                .1
493                                .try_to_scalar_value(filter_ctx.data_type())
494                                .context(DataTypeMismatchSnafu)?
495                        }
496                        CompositeValues::Sparse(v) => {
497                            let v = v.get_or_null(filter_ctx.column_id());
498                            v.try_to_scalar_value(filter_ctx.data_type())
499                                .context(DataTypeMismatchSnafu)?
500                        }
501                    };
502                    if filter
503                        .evaluate_scalar(&pk_value)
504                        .context(RecordBatchSnafu)?
505                    {
506                        continue;
507                    } else {
508                        // PK not match means the entire batch is filtered out.
509                        return Ok(None);
510                    }
511                }
512                SemanticType::Field => {
513                    // Skip field filters if skip_fields is true
514                    if skip_fields {
515                        continue;
516                    }
517                    // Safety: Input is Batch so we are using primary key format.
518                    let Some(field_index) = self
519                        .read_format
520                        .as_primary_key()
521                        .unwrap()
522                        .field_index_by_id(filter_ctx.column_id())
523                    else {
524                        continue;
525                    };
526                    let field_col = &input.fields()[field_index].data;
527                    filter
528                        .evaluate_vector(field_col)
529                        .context(RecordBatchSnafu)?
530                }
531                SemanticType::Timestamp => filter
532                    .evaluate_vector(input.timestamps())
533                    .context(RecordBatchSnafu)?,
534            };
535
536            mask = mask.bitand(&result);
537        }
538
539        if mask.count_set_bits() == 0 {
540            return Ok(None);
541        }
542
543        // Apply partition filter
544        if let Some(partition_filter) = &self.partition_filter {
545            let record_batch = self
546                .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
547            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
548            mask = mask.bitand(&partition_mask);
549        }
550
551        if mask.count_set_bits() == 0 {
552            Ok(None)
553        } else {
554            input.filter(&BooleanArray::from(mask).into())?;
555            Ok(Some(input))
556        }
557    }
558
559    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
560    ///
561    /// It assumes all necessary tags are already decoded from the primary key.
562    ///
563    /// # Arguments
564    /// * `input` - The RecordBatch to filter
565    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
566    pub(crate) fn precise_filter_flat(
567        &self,
568        input: RecordBatch,
569        skip_fields: bool,
570    ) -> Result<Option<RecordBatch>> {
571        let mut tag_decode_state = TagDecodeState::new();
572        let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
573
574        // If mask is None, the entire batch is filtered out
575        let Some(mut mask) = mask else {
576            return Ok(None);
577        };
578
579        // Apply partition filter
580        if let Some(partition_filter) = &self.partition_filter {
581            let record_batch = self.project_record_batch_for_pruning_flat(
582                &input,
583                &partition_filter.partition_schema,
584                &mut tag_decode_state,
585            )?;
586            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
587            mask = mask.bitand(&partition_mask);
588        }
589
590        if mask.count_set_bits() == 0 {
591            return Ok(None);
592        }
593
594        let filtered_batch =
595            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
596                .context(ComputeArrowSnafu)?;
597
598        if filtered_batch.num_rows() > 0 {
599            Ok(Some(filtered_batch))
600        } else {
601            Ok(None)
602        }
603    }
604
605    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
606    /// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function.
607    ///
608    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
609    ///
610    /// # Arguments
611    /// * `input` - The RecordBatch to compute mask for
612    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
613    pub(crate) fn compute_filter_mask_flat(
614        &self,
615        input: &RecordBatch,
616        skip_fields: bool,
617        tag_decode_state: &mut TagDecodeState,
618    ) -> Result<Option<BooleanBuffer>> {
619        let mut mask = BooleanBuffer::new_set(input.num_rows());
620
621        let flat_format = self
622            .read_format
623            .as_flat()
624            .context(crate::error::UnexpectedSnafu {
625                reason: "Expected flat format for precise_filter_flat",
626            })?;
627        let metadata = flat_format.metadata();
628
629        // Run filter one by one and combine them result
630        for filter_ctx in &self.filters {
631            let filter = match filter_ctx.filter() {
632                MaybeFilter::Filter(f) => f,
633                // Column matches.
634                MaybeFilter::Matched => continue,
635                // Column doesn't match, filter the entire batch.
636                MaybeFilter::Pruned => return Ok(None),
637            };
638
639            // Skip field filters if skip_fields is true
640            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
641                continue;
642            }
643
644            // Get the column directly by its projected index.
645            // If the column is missing and it's not a tag/time column, this filter is skipped.
646            // Assumes the projection indices align with the input batch schema.
647            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
648            if let Some(idx) = column_idx {
649                let column = &input.columns().get(idx).unwrap();
650                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
651                mask = mask.bitand(&result);
652            } else if filter_ctx.semantic_type() == SemanticType::Tag {
653                // Column not found in projection, it may be a tag column.
654                let column_id = filter_ctx.column_id();
655
656                if let Some(tag_column) =
657                    self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
658                {
659                    let result = filter
660                        .evaluate_array(&tag_column)
661                        .context(RecordBatchSnafu)?;
662                    mask = mask.bitand(&result);
663                }
664            } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
665                let time_index_pos = time_index_column_index(input.num_columns());
666                let column = &input.columns()[time_index_pos];
667                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
668                mask = mask.bitand(&result);
669            }
670            // Non-tag column not found in projection.
671        }
672
673        Ok(Some(mask))
674    }
675
676    /// Returns the decoded tag column for `column_id`, or `None` if it's not a tag.
677    fn maybe_decode_tag_column(
678        &self,
679        metadata: &RegionMetadataRef,
680        column_id: ColumnId,
681        input: &RecordBatch,
682        tag_decode_state: &mut TagDecodeState,
683    ) -> Result<Option<ArrayRef>> {
684        let Some(pk_index) = metadata.primary_key_index(column_id) else {
685            return Ok(None);
686        };
687
688        if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
689            return Ok(Some(cached_column.clone()));
690        }
691
692        if tag_decode_state.decoded_pks.is_none() {
693            tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
694        }
695
696        let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
697            None
698        } else {
699            Some(pk_index)
700        };
701        let Some(column_index) = metadata.column_index_by_id(column_id) else {
702            return Ok(None);
703        };
704        let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
705            return Ok(None);
706        };
707
708        let column_metadata = &metadata.column_metadatas[column_index];
709        let tag_column = decoded.get_tag_column(
710            column_id,
711            pk_index,
712            &column_metadata.column_schema.data_type,
713        )?;
714        tag_decode_state
715            .decoded_tag_cache
716            .insert(column_id, tag_column.clone());
717
718        Ok(Some(tag_column))
719    }
720
721    /// Evaluates the partition filter against the input `RecordBatch`.
722    fn evaluate_partition_filter(
723        &self,
724        record_batch: &RecordBatch,
725        partition_filter: &PartitionFilterContext,
726    ) -> Result<BooleanBuffer> {
727        let columnar_value = partition_filter
728            .region_partition_physical_expr
729            .evaluate(record_batch)
730            .context(EvalPartitionFilterSnafu)?;
731        let array = columnar_value
732            .into_array(record_batch.num_rows())
733            .context(EvalPartitionFilterSnafu)?;
734        let boolean_array =
735            array
736                .as_any()
737                .downcast_ref::<BooleanArray>()
738                .context(UnexpectedSnafu {
739                    reason: "Failed to downcast to BooleanArray".to_string(),
740                })?;
741
742        Ok(boolean_array.values().clone())
743    }
744
745    /// Builds a `RecordBatch` from the input `Batch` matching the given schema.
746    ///
747    /// This is used for partition expression evaluation. The schema should only contain
748    /// the columns referenced by the partition expression to minimize overhead.
749    fn build_record_batch_for_pruning(
750        &self,
751        input: &mut Batch,
752        schema: &Arc<Schema>,
753    ) -> Result<RecordBatch> {
754        let arrow_schema = schema.arrow_schema();
755        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
756
757        // Decode primary key if necessary.
758        if input.pk_values().is_none() {
759            input.set_pk_values(
760                self.codec
761                    .decode(input.primary_key())
762                    .context(DecodeSnafu)?,
763            );
764        }
765
766        for field in arrow_schema.fields() {
767            let metadata = self.read_format.metadata();
768            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
769
770            // Partition pruning schema should be a subset of the input batch schema.
771            let Some(column_id) = column_id else {
772                return UnexpectedSnafu {
773                    reason: format!(
774                        "Partition pruning schema expects column '{}' but it is missing in \
775                         region metadata",
776                        field.name()
777                    ),
778                }
779                .fail();
780            };
781
782            // 1. Check if it's a tag.
783            if let Some(pk_index) = metadata.primary_key_index(column_id) {
784                let pk_values = input.pk_values().unwrap();
785                let value = match pk_values {
786                    CompositeValues::Dense(v) => &v[pk_index].1,
787                    CompositeValues::Sparse(v) => v.get_or_null(column_id),
788                };
789                let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
790                let arrow_scalar = value
791                    .try_to_scalar_value(&concrete_type)
792                    .context(DataTypeMismatchSnafu)?;
793                let array = arrow_scalar
794                    .to_array_of_size(input.num_rows())
795                    .context(EvalPartitionFilterSnafu)?;
796                columns.push(array);
797            } else if metadata.time_index_column().column_id == column_id {
798                // 2. Check if it's the timestamp column.
799                columns.push(input.timestamps().to_arrow_array());
800            } else if let Some(field_index) = self
801                .read_format
802                .as_primary_key()
803                .and_then(|f| f.field_index_by_id(column_id))
804            {
805                // 3. Check if it's a field column.
806                columns.push(input.fields()[field_index].data.to_arrow_array());
807            } else {
808                return UnexpectedSnafu {
809                    reason: format!(
810                        "Partition pruning schema expects column '{}' (id {}) but it is not \
811                         present in input batch",
812                        field.name(),
813                        column_id
814                    ),
815                }
816                .fail();
817            }
818        }
819
820        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
821    }
822
823    /// Projects the input `RecordBatch` to match the given schema.
824    ///
825    /// This is used for partition expression evaluation. The schema should only contain
826    /// the columns referenced by the partition expression to minimize overhead.
827    fn project_record_batch_for_pruning_flat(
828        &self,
829        input: &RecordBatch,
830        schema: &Arc<Schema>,
831        tag_decode_state: &mut TagDecodeState,
832    ) -> Result<RecordBatch> {
833        let arrow_schema = schema.arrow_schema();
834        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
835
836        let flat_format = self
837            .read_format
838            .as_flat()
839            .context(crate::error::UnexpectedSnafu {
840                reason: "Expected flat format for precise_filter_flat",
841            })?;
842        let metadata = flat_format.metadata();
843
844        for field in arrow_schema.fields() {
845            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
846
847            let Some(column_id) = column_id else {
848                return UnexpectedSnafu {
849                    reason: format!(
850                        "Partition pruning schema expects column '{}' but it is missing in \
851                         region metadata",
852                        field.name()
853                    ),
854                }
855                .fail();
856            };
857
858            if let Some(idx) = flat_format.projected_index_by_id(column_id) {
859                columns.push(input.column(idx).clone());
860                continue;
861            }
862
863            if metadata.time_index_column().column_id == column_id {
864                let time_index_pos = time_index_column_index(input.num_columns());
865                columns.push(input.column(time_index_pos).clone());
866                continue;
867            }
868
869            if let Some(tag_column) =
870                self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
871            {
872                columns.push(tag_column);
873                continue;
874            }
875
876            return UnexpectedSnafu {
877                reason: format!(
878                    "Partition pruning schema expects column '{}' (id {}) but it is not \
879                     present in projected record batch",
880                    field.name(),
881                    column_id
882                ),
883            }
884            .fail();
885        }
886
887        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
888    }
889}