Skip to main content

mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::Schema;
30use mito_codec::row_converter::PrimaryKeyCodec;
31use parquet::arrow::arrow_reader::RowSelection;
32use parquet::file::metadata::ParquetMetaData;
33use snafu::{OptionExt, ResultExt};
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::{ColumnId, TimeSeriesRowSelector};
37use table::predicate::Predicate;
38
39use crate::cache::CacheStrategy;
40use crate::error::{
41    ComputeArrowSnafu, DecodeStatsSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
42    RecordBatchSnafu, Result, StatsNotPresentSnafu, UnexpectedSnafu,
43};
44use crate::read::compat::FlatCompatBatch;
45use crate::read::flat_projection::CompactionProjectionMapper;
46use crate::read::last_row::FlatRowGroupLastRowCachedReader;
47use crate::read::prune::FlatPruneReader;
48use crate::sst::file::FileHandle;
49use crate::sst::parquet::flat_format::{
50    DecodedPrimaryKeys, FlatReadFormat, decode_primary_keys, time_index_column_index,
51};
52use crate::sst::parquet::reader::{
53    FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReaderBuilder,
54    SimpleFilterContext,
55};
56use crate::sst::parquet::row_group::ParquetFetchMetrics;
57use crate::sst::parquet::stats::RowGroupPruningStats;
58
59/// Checks if a row group contains delete operations by examining the min value of op_type column.
60///
61/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
62/// or an error if the statistics are not present or cannot be decoded.
63pub(crate) fn row_group_contains_delete(
64    parquet_meta: &ParquetMetaData,
65    row_group_index: usize,
66    file_path: &str,
67) -> Result<bool> {
68    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
69
70    // safety: The last column of SST must be op_type
71    let column_metadata = &row_group_metadata.columns().last().unwrap();
72    let stats = column_metadata
73        .statistics()
74        .context(StatsNotPresentSnafu { file_path })?;
75    stats
76        .min_bytes_opt()
77        .context(StatsNotPresentSnafu { file_path })?
78        .try_into()
79        .map(i32::from_le_bytes)
80        .map(|min_op_type| min_op_type == OpType::Delete as i32)
81        .ok()
82        .context(DecodeStatsSnafu { file_path })
83}
84
85/// A range of a parquet SST. Now it is a row group.
86/// We can read different file ranges in parallel.
87#[derive(Clone)]
88pub struct FileRange {
89    /// Shared context.
90    context: FileRangeContextRef,
91    /// Index of the row group in the SST.
92    row_group_idx: usize,
93    /// Row selection for the row group. `None` means all rows.
94    row_selection: Option<RowSelection>,
95}
96
97impl FileRange {
98    /// Creates a new [FileRange].
99    pub(crate) fn new(
100        context: FileRangeContextRef,
101        row_group_idx: usize,
102        row_selection: Option<RowSelection>,
103    ) -> Self {
104        Self {
105            context,
106            row_group_idx,
107            row_selection,
108        }
109    }
110
111    /// Returns true if [FileRange] selects all rows in row group.
112    fn select_all(&self) -> bool {
113        let rows_in_group = self
114            .context
115            .reader_builder
116            .parquet_metadata()
117            .row_group(self.row_group_idx)
118            .num_rows();
119
120        let Some(row_selection) = &self.row_selection else {
121            return true;
122        };
123        row_selection.row_count() == rows_in_group as usize
124    }
125
126    /// Performs pruning before reading the [FileRange].
127    /// It use latest dynamic filters with row group statistics to prune the range.
128    ///
129    /// Returns false if the entire range is pruned and can be skipped.
130    fn in_dynamic_filter_range(&self) -> bool {
131        if self.context.base.dyn_filters.is_empty() {
132            return true;
133        }
134        let curr_row_group = self
135            .context
136            .reader_builder
137            .parquet_metadata()
138            .row_group(self.row_group_idx);
139        let read_format = self.context.read_format();
140        let prune_schema = &self.context.base.prune_schema;
141        let stats = RowGroupPruningStats::new(
142            std::slice::from_ref(curr_row_group),
143            read_format,
144            self.context.base.expected_metadata.clone(),
145            self.context.base.pre_filter_mode.skip_fields(),
146        );
147
148        // not costly to create a predicate here since dynamic filters are wrapped in Arc
149        let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
150
151        pred.prune_with_stats(&stats, prune_schema.arrow_schema())
152            .first()
153            .cloned()
154            .unwrap_or(true) // unexpected, not skip just in case
155    }
156
157    /// Creates a flat reader that returns RecordBatch.
158    pub(crate) async fn flat_reader(
159        &self,
160        selector: Option<TimeSeriesRowSelector>,
161        fetch_metrics: Option<&ParquetFetchMetrics>,
162    ) -> Result<Option<FlatPruneReader>> {
163        if !self.in_dynamic_filter_range() {
164            return Ok(None);
165        }
166        // Compute skip_fields once for this row group
167        let skip_fields = self.context.base.pre_filter_mode.skip_fields();
168        let parquet_reader = self
169            .context
170            .reader_builder
171            .build(self.context.build_context(
172                self.row_group_idx,
173                self.row_selection.clone(),
174                fetch_metrics,
175            ))
176            .await?;
177
178        let use_last_row_reader = if selector
179            .map(|s| s == TimeSeriesRowSelector::LastRow)
180            .unwrap_or(false)
181        {
182            // Only use LastRowReader if row group does not contain DELETE
183            // and all rows are selected.
184            let put_only = !self
185                .context
186                .contains_delete(self.row_group_idx)
187                .inspect_err(|e| {
188                    error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
189                })
190                .unwrap_or(true);
191            put_only && self.select_all()
192        } else {
193            false
194        };
195
196        let flat_prune_reader = if use_last_row_reader {
197            let flat_row_group_reader =
198                FlatRowGroupReader::new(self.context.clone(), parquet_reader);
199            // Flat PK prefilter makes the input stream predicate-dependent, so cached
200            // selector results are not reusable across queries with different filters.
201            let cache_strategy = if self.context.reader_builder.has_predicate_prefilter() {
202                CacheStrategy::Disabled
203            } else {
204                self.context.reader_builder.cache_strategy().clone()
205            };
206            let reader = FlatRowGroupLastRowCachedReader::new(
207                self.file_handle().file_id().file_id(),
208                self.row_group_idx,
209                cache_strategy,
210                self.context.read_format().parquet_read_columns(),
211                flat_row_group_reader,
212            );
213            FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
214        } else {
215            let flat_row_group_reader =
216                FlatRowGroupReader::new(self.context.clone(), parquet_reader);
217            FlatPruneReader::new_with_row_group_reader(
218                self.context.clone(),
219                flat_row_group_reader,
220                skip_fields,
221            )
222        };
223
224        Ok(Some(flat_prune_reader))
225    }
226
227    /// Returns the helper to compat batches.
228    pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> {
229        self.context.compat_batch()
230    }
231
232    /// Returns the helper to project batches.
233    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
234        self.context.compaction_projection_mapper()
235    }
236
237    /// Returns the file handle of the file range.
238    pub(crate) fn file_handle(&self) -> &FileHandle {
239        self.context.reader_builder.file_handle()
240    }
241}
242
243/// Context shared by ranges of the same parquet SST.
244pub(crate) struct FileRangeContext {
245    /// Row group reader builder for the file.
246    reader_builder: RowGroupReaderBuilder,
247    /// Base of the context.
248    base: RangeBase,
249}
250
251pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
252
253impl FileRangeContext {
254    /// Creates a new [FileRangeContext].
255    pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
256        Self {
257            reader_builder,
258            base,
259        }
260    }
261
262    /// Returns filters pushed down.
263    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
264        &self.base.filters
265    }
266
267    /// Returns true if a partition filter is configured.
268    pub(crate) fn has_partition_filter(&self) -> bool {
269        self.base.partition_filter.is_some()
270    }
271
272    /// Returns the format helper.
273    pub(crate) fn read_format(&self) -> &FlatReadFormat {
274        &self.base.read_format
275    }
276
277    /// Returns the reader builder.
278    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
279        &self.reader_builder
280    }
281
282    /// Returns the helper to compat batches.
283    pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> {
284        self.base.compat_batch.as_ref()
285    }
286
287    /// Returns the helper to project batches.
288    pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
289        self.base.compaction_projection_mapper.as_ref()
290    }
291
292    /// Sets the compat helper to the context.
293    pub(crate) fn set_compat_batch(&mut self, compat: Option<FlatCompatBatch>) {
294        self.base.compat_batch = compat;
295    }
296
297    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
298    /// If a partition expr filter is configured, it is also applied.
299    /// Physical filter exprs are not evaluated here; they are only applied during prefiltering.
300    pub(crate) fn precise_filter_flat(
301        &self,
302        input: RecordBatch,
303        skip_fields: bool,
304    ) -> Result<Option<RecordBatch>> {
305        self.base.precise_filter_flat(input, skip_fields)
306    }
307
308    pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
309        self.base.pre_filter_mode
310    }
311
312    //// Decodes parquet metadata and finds if row group contains delete op.
313    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
314        let metadata = self.reader_builder.parquet_metadata();
315        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
316    }
317
318    /// Creates a [RowGroupBuildContext] for building row group readers with prefiltering.
319    pub(crate) fn build_context<'a>(
320        &'a self,
321        row_group_idx: usize,
322        row_selection: Option<RowSelection>,
323        fetch_metrics: Option<&'a ParquetFetchMetrics>,
324    ) -> RowGroupBuildContext<'a> {
325        RowGroupBuildContext {
326            row_group_idx,
327            row_selection,
328            fetch_metrics,
329        }
330    }
331
332    /// Returns the estimated memory size of this context.
333    /// Mainly accounts for the parquet metadata size.
334    pub(crate) fn memory_size(&self) -> usize {
335        crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
336    }
337}
338
339/// Mode to pre-filter columns in a range.
340#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341pub enum PreFilterMode {
342    /// Filters all columns.
343    All,
344    /// Always skip fields.
345    SkipFields,
346}
347
348impl PreFilterMode {
349    pub(crate) fn skip_fields(self) -> bool {
350        matches!(self, Self::SkipFields)
351    }
352}
353
354/// Context for partition expression filtering.
355pub(crate) struct PartitionFilterContext {
356    pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
357    /// Schema containing only columns referenced by the partition expression.
358    /// This is used to build a minimal RecordBatch for partition filter evaluation.
359    pub(crate) partition_schema: Arc<Schema>,
360}
361
362/// Common fields for a range to read and filter batches.
363pub(crate) struct RangeBase {
364    /// Filters pushed down.
365    pub(crate) filters: Vec<SimpleFilterContext>,
366    /// Dynamic filter physical exprs.
367    pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
368    /// Helper to read the SST.
369    pub(crate) read_format: FlatReadFormat,
370    pub(crate) expected_metadata: Option<RegionMetadataRef>,
371    /// Schema used for pruning with dynamic filters.
372    pub(crate) prune_schema: Arc<Schema>,
373    /// Decoder for primary keys
374    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
375    /// Optional helper to compat batches.
376    pub(crate) compat_batch: Option<FlatCompatBatch>,
377    /// Optional helper to project batches.
378    pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
379    /// Mode to pre-filter columns.
380    pub(crate) pre_filter_mode: PreFilterMode,
381    /// Partition filter.
382    pub(crate) partition_filter: Option<PartitionFilterContext>,
383}
384
385pub(crate) struct TagDecodeState {
386    decoded_pks: Option<DecodedPrimaryKeys>,
387    decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
388}
389
390impl TagDecodeState {
391    pub(crate) fn new() -> Self {
392        Self {
393            decoded_pks: None,
394            decoded_tag_cache: HashMap::new(),
395        }
396    }
397}
398
399impl RangeBase {
400    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
401    ///
402    /// It assumes all necessary tags are already decoded from the primary key.
403    ///
404    /// # Arguments
405    /// * `input` - The RecordBatch to filter
406    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode
407    pub(crate) fn precise_filter_flat(
408        &self,
409        input: RecordBatch,
410        skip_fields: bool,
411    ) -> Result<Option<RecordBatch>> {
412        let mut tag_decode_state = TagDecodeState::new();
413        let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
414
415        // If mask is None, the entire batch is filtered out
416        let Some(mut mask) = mask else {
417            return Ok(None);
418        };
419
420        // Apply partition filter
421        if let Some(partition_filter) = &self.partition_filter {
422            let record_batch = self.project_record_batch_for_pruning_flat(
423                &input,
424                &partition_filter.partition_schema,
425                &mut tag_decode_state,
426            )?;
427            let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
428            mask = mask.bitand(&partition_mask);
429        }
430
431        if mask.count_set_bits() == 0 {
432            return Ok(None);
433        }
434
435        let filtered_batch =
436            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
437                .context(ComputeArrowSnafu)?;
438
439        if filtered_batch.num_rows() > 0 {
440            Ok(Some(filtered_batch))
441        } else {
442            Ok(None)
443        }
444    }
445
446    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
447    /// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function.
448    /// Physical filter exprs are excluded here and only apply during prefiltering.
449    ///
450    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
451    ///
452    /// # Arguments
453    /// * `input` - The RecordBatch to compute mask for
454    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode
455    pub(crate) fn compute_filter_mask_flat(
456        &self,
457        input: &RecordBatch,
458        skip_fields: bool,
459        tag_decode_state: &mut TagDecodeState,
460    ) -> Result<Option<BooleanBuffer>> {
461        let mut mask = BooleanBuffer::new_set(input.num_rows());
462
463        let metadata = self.read_format.metadata();
464
465        // Run filter one by one and combine them result
466        for filter_ctx in &self.filters {
467            let filter = match filter_ctx.filter() {
468                MaybeFilter::Filter(f) => f,
469                // Column matches.
470                MaybeFilter::Matched => continue,
471                // Column doesn't match, filter the entire batch.
472                MaybeFilter::Pruned => return Ok(None),
473            };
474
475            // Skip field filters if skip_fields is true
476            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
477                continue;
478            }
479
480            // Get the column directly by its projected index.
481            // If the column is missing and it's not a tag/time column, this filter is skipped.
482            // Assumes the projection indices align with the input batch schema.
483            let column_idx = self
484                .read_format
485                .projected_index_by_id(filter_ctx.column_id());
486            if let Some(idx) = column_idx {
487                let column = &input.columns().get(idx).unwrap();
488                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
489                mask = mask.bitand(&result);
490            } else if filter_ctx.semantic_type() == SemanticType::Tag {
491                // Column not found in projection, it may be a tag column.
492                let column_id = filter_ctx.column_id();
493
494                if let Some(tag_column) =
495                    self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
496                {
497                    let result = filter
498                        .evaluate_array(&tag_column)
499                        .context(RecordBatchSnafu)?;
500                    mask = mask.bitand(&result);
501                }
502            } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
503                let time_index_pos = time_index_column_index(input.num_columns());
504                let column = &input.columns()[time_index_pos];
505                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
506                mask = mask.bitand(&result);
507            }
508            // Non-tag column not found in projection.
509        }
510
511        Ok(Some(mask))
512    }
513
514    /// Returns the decoded tag column for `column_id`, or `None` if it's not a tag.
515    fn maybe_decode_tag_column(
516        &self,
517        metadata: &RegionMetadataRef,
518        column_id: ColumnId,
519        input: &RecordBatch,
520        tag_decode_state: &mut TagDecodeState,
521    ) -> Result<Option<ArrayRef>> {
522        let Some(pk_index) = metadata.primary_key_index(column_id) else {
523            return Ok(None);
524        };
525
526        if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
527            return Ok(Some(cached_column.clone()));
528        }
529
530        if tag_decode_state.decoded_pks.is_none() {
531            tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
532        }
533
534        let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
535            None
536        } else {
537            Some(pk_index)
538        };
539        let Some(column_index) = metadata.column_index_by_id(column_id) else {
540            return Ok(None);
541        };
542        let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
543            return Ok(None);
544        };
545
546        let column_metadata = &metadata.column_metadatas[column_index];
547        let tag_column = decoded.get_tag_column(
548            column_id,
549            pk_index,
550            &column_metadata.column_schema.data_type,
551        )?;
552        tag_decode_state
553            .decoded_tag_cache
554            .insert(column_id, tag_column.clone());
555
556        Ok(Some(tag_column))
557    }
558
559    /// Evaluates the partition filter against the input `RecordBatch`.
560    fn evaluate_partition_filter(
561        &self,
562        record_batch: &RecordBatch,
563        partition_filter: &PartitionFilterContext,
564    ) -> Result<BooleanBuffer> {
565        let columnar_value = partition_filter
566            .region_partition_physical_expr
567            .evaluate(record_batch)
568            .context(EvalPartitionFilterSnafu)?;
569        let array = columnar_value
570            .into_array(record_batch.num_rows())
571            .context(EvalPartitionFilterSnafu)?;
572        let boolean_array =
573            array
574                .as_any()
575                .downcast_ref::<BooleanArray>()
576                .context(UnexpectedSnafu {
577                    reason: "Failed to downcast to BooleanArray".to_string(),
578                })?;
579
580        // also need to consider nulls in the partition filter result. If a value is null, it should be treated as false (filtered out).
581        let mut mask = boolean_array.values().clone();
582        if let Some(nulls) = boolean_array.nulls() {
583            mask = mask.bitand(nulls.inner());
584        }
585
586        Ok(mask)
587    }
588
589    /// Projects the input `RecordBatch` to match the given schema.
590    ///
591    /// This is used for partition expression evaluation. The schema should only contain
592    /// the columns referenced by the partition expression to minimize overhead.
593    fn project_record_batch_for_pruning_flat(
594        &self,
595        input: &RecordBatch,
596        schema: &Arc<Schema>,
597        tag_decode_state: &mut TagDecodeState,
598    ) -> Result<RecordBatch> {
599        let arrow_schema = schema.arrow_schema();
600        let mut columns = Vec::with_capacity(arrow_schema.fields().len());
601
602        let metadata = self.read_format.metadata();
603
604        for field in arrow_schema.fields() {
605            let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
606
607            let Some(column_id) = column_id else {
608                return UnexpectedSnafu {
609                    reason: format!(
610                        "Partition pruning schema expects column '{}' but it is missing in \
611                         region metadata",
612                        field.name()
613                    ),
614                }
615                .fail();
616            };
617
618            if let Some(idx) = self.read_format.projected_index_by_id(column_id) {
619                columns.push(input.column(idx).clone());
620                continue;
621            }
622
623            if metadata.time_index_column().column_id == column_id {
624                let time_index_pos = time_index_column_index(input.num_columns());
625                columns.push(input.column(time_index_pos).clone());
626                continue;
627            }
628
629            if let Some(tag_column) =
630                self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
631            {
632                columns.push(tag_column);
633                continue;
634            }
635
636            return UnexpectedSnafu {
637                reason: format!(
638                    "Partition pruning schema expects column '{}' (id {}) but it is not \
639                     present in projected record batch",
640                    field.name(),
641                    column_id
642                ),
643            }
644            .fail();
645        }
646
647        RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use std::sync::Arc;
654
655    use datafusion_expr::{col, lit};
656
657    use super::*;
658    use crate::read::read_columns::ReadColumns;
659    use crate::sst::parquet::flat_format::FlatReadFormat;
660    use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
661
662    fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
663        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
664
665        let read_format = FlatReadFormat::new(
666            metadata.clone(),
667            ReadColumns::from_deduped_column_ids(
668                metadata.column_metadatas.iter().map(|c| c.column_id),
669            ),
670            None,
671            "test",
672            true,
673        )
674        .unwrap();
675
676        RangeBase {
677            filters,
678            dyn_filters: vec![],
679            read_format,
680            expected_metadata: None,
681            prune_schema: metadata.schema.clone(),
682            codec: mito_codec::row_converter::build_primary_key_codec(metadata.as_ref()),
683            compat_batch: None,
684            compaction_projection_mapper: None,
685            pre_filter_mode: PreFilterMode::All,
686            partition_filter: None,
687        }
688    }
689
690    #[test]
691    fn test_compute_filter_mask_flat_applies_remaining_simple_filters() {
692        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
693        let filters = vec![
694            SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(),
695            SimpleFilterContext::new_opt(&metadata, None, &col("field_0").gt(lit(1_u64))).unwrap(),
696        ];
697        let base = new_test_range_base(filters);
698        let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
699
700        let mask = base
701            .compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new())
702            .unwrap()
703            .unwrap();
704        assert_eq!(mask.count_set_bits(), 0);
705    }
706
707    #[test]
708    fn test_compute_filter_mask_flat_does_not_postfilter_physical_filters() {
709        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
710        let read_format = FlatReadFormat::new(
711            metadata.clone(),
712            ReadColumns::from_deduped_column_ids(
713                metadata.column_metadatas.iter().map(|c| c.column_id),
714            ),
715            None,
716            "test",
717            true,
718        )
719        .unwrap();
720        let physical_filter = crate::sst::parquet::reader::PhysicalFilterContext::new_opt(
721            &metadata,
722            None,
723            &read_format,
724            &col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false),
725        );
726        assert!(physical_filter.is_some());
727        let base = new_test_range_base(vec![]);
728        let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
729
730        let mask = base
731            .compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new())
732            .unwrap()
733            .unwrap();
734        assert_eq!(mask.count_set_bits(), 4);
735    }
736}