mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
25use datatypes::arrow::array::{ArrayRef, BooleanArray};
26use datatypes::arrow::buffer::BooleanBuffer;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::Schema;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
30use parquet::arrow::arrow_reader::RowSelection;
31use parquet::file::metadata::ParquetMetaData;
32use snafu::{OptionExt, ResultExt};
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, TimeSeriesRowSelector};
36use table::predicate::Predicate;
37
38use crate::error::{
39    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
40    Result, StatsNotPresentSnafu,
41};
42use crate::read::Batch;
43use crate::read::compat::CompatBatch;
44use crate::read::last_row::RowGroupLastRowCachedReader;
45use crate::read::prune::{FlatPruneReader, PruneReader};
46use crate::sst::file::FileHandle;
47use crate::sst::parquet::flat_format::{
48    DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
49};
50use crate::sst::parquet::format::ReadFormat;
51use crate::sst::parquet::reader::{
52    FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
53};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55use crate::sst::parquet::stats::RowGroupPruningStats;
56
57/// Checks if a row group contains delete operations by examining the min value of op_type column.
58///
59/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
60/// or an error if the statistics are not present or cannot be decoded.
61pub(crate) fn row_group_contains_delete(
62    parquet_meta: &ParquetMetaData,
63    row_group_index: usize,
64    file_path: &str,
65) -> Result<bool> {
66    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
67
68    // safety: The last column of SST must be op_type
69    let column_metadata = &row_group_metadata.columns().last().unwrap();
70    let stats = column_metadata
71        .statistics()
72        .context(StatsNotPresentSnafu { file_path })?;
73    stats
74        .min_bytes_opt()
75        .context(StatsNotPresentSnafu { file_path })?
76        .try_into()
77        .map(i32::from_le_bytes)
78        .map(|min_op_type| min_op_type == OpType::Delete as i32)
79        .ok()
80        .context(DecodeStatsSnafu { file_path })
81}
82
83/// A range of a parquet SST. Now it is a row group.
84/// We can read different file ranges in parallel.
85#[derive(Clone)]
86pub struct FileRange {
87    /// Shared context.
88    context: FileRangeContextRef,
89    /// Index of the row group in the SST.
90    row_group_idx: usize,
91    /// Row selection for the row group. `None` means all rows.
92    row_selection: Option<RowSelection>,
93}
94
95impl FileRange {
96    /// Creates a new [FileRange].
97    pub(crate) fn new(
98        context: FileRangeContextRef,
99        row_group_idx: usize,
100        row_selection: Option<RowSelection>,
101    ) -> Self {
102        Self {
103            context,
104            row_group_idx,
105            row_selection,
106        }
107    }
108
109    /// Returns true if [FileRange] selects all rows in row group.
110    fn select_all(&self) -> bool {
111        let rows_in_group = self
112            .context
113            .reader_builder
114            .parquet_metadata()
115            .row_group(self.row_group_idx)
116            .num_rows();
117
118        let Some(row_selection) = &self.row_selection else {
119            return true;
120        };
121        row_selection.row_count() == rows_in_group as usize
122    }
123
124    /// Performs pruning before reading the [FileRange].
125    /// It use latest dynamic filters with row group statistics to prune the range.
126    ///
127    /// Returns false if the entire range is pruned and can be skipped.
128    fn in_dynamic_filter_range(&self) -> bool {
129        if self.context.base.dyn_filters.is_empty() {
130            return true;
131        }
132        let curr_row_group = self
133            .context
134            .reader_builder
135            .parquet_metadata()
136            .row_group(self.row_group_idx);
137        let read_format = self.context.read_format();
138        let prune_schema = &self.context.base.prune_schema;
139        let stats = RowGroupPruningStats::new(
140            std::slice::from_ref(curr_row_group),
141            read_format,
142            self.context.base.expected_metadata.clone(),
143            self.compute_skip_fields(),
144        );
145
146        // not costly to create a predicate here since dynamic filters are wrapped in Arc
147        let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone());
148
149        pred.prune_with_stats(&stats, prune_schema.arrow_schema())
150            .first()
151            .cloned()
152            .unwrap_or(true) // unexpected, not skip just in case
153    }
154
155    fn compute_skip_fields(&self) -> bool {
156        match self.context.base.pre_filter_mode {
157            PreFilterMode::All => false,
158            PreFilterMode::SkipFields => true,
159            PreFilterMode::SkipFieldsOnDelete => {
160                // Check if this specific row group contains delete op
161                row_group_contains_delete(
162                    self.context.reader_builder.parquet_metadata(),
163                    self.row_group_idx,
164                    self.context.reader_builder.file_path(),
165                )
166                .unwrap_or(true)
167            }
168        }
169    }
170
171    /// Returns a reader to read the [FileRange].
172    pub(crate) async fn reader(
173        &self,
174        selector: Option<TimeSeriesRowSelector>,
175        fetch_metrics: Option<&ParquetFetchMetrics>,
176    ) -> Result<Option<PruneReader>> {
177        if !self.in_dynamic_filter_range() {
178            return Ok(None);
179        }
180        let parquet_reader = self
181            .context
182            .reader_builder
183            .build(
184                self.row_group_idx,
185                self.row_selection.clone(),
186                fetch_metrics,
187            )
188            .await?;
189
190        let use_last_row_reader = if selector
191            .map(|s| s == TimeSeriesRowSelector::LastRow)
192            .unwrap_or(false)
193        {
194            // Only use LastRowReader if row group does not contain DELETE
195            // and all rows are selected.
196            let put_only = !self
197                .context
198                .contains_delete(self.row_group_idx)
199                .inspect_err(|e| {
200                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
201                })
202                .unwrap_or(true);
203            put_only && self.select_all()
204        } else {
205            // No selector provided, use RowGroupReader
206            false
207        };
208
209        // Compute skip_fields once for this row group
210        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
211
212        let prune_reader = if use_last_row_reader {
213            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
214            let reader = RowGroupLastRowCachedReader::new(
215                self.file_handle().file_id().file_id(),
216                self.row_group_idx,
217                self.context.reader_builder.cache_strategy().clone(),
218                RowGroupReader::new(self.context.clone(), parquet_reader),
219            );
220            PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
221        } else {
222            // Row group contains DELETE, fallback to default reader.
223            PruneReader::new_with_row_group_reader(
224                self.context.clone(),
225                RowGroupReader::new(self.context.clone(), parquet_reader),
226                skip_fields,
227            )
228        };
229
230        Ok(Some(prune_reader))
231    }
232
233    /// Creates a flat reader that returns RecordBatch.
234    pub(crate) async fn flat_reader(
235        &self,
236        fetch_metrics: Option<&ParquetFetchMetrics>,
237    ) -> Result<Option<FlatPruneReader>> {
238        if !self.in_dynamic_filter_range() {
239            return Ok(None);
240        }
241        let parquet_reader = self
242            .context
243            .reader_builder
244            .build(
245                self.row_group_idx,
246                self.row_selection.clone(),
247                fetch_metrics,
248            )
249            .await?;
250
251        // Compute skip_fields once for this row group
252        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
253
254        let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
255        let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
256            self.context.clone(),
257            flat_row_group_reader,
258            skip_fields,
259        );
260
261        Ok(Some(flat_prune_reader))
262    }
263
264    /// Returns the helper to compat batches.
265    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
266        self.context.compat_batch()
267    }
268
269    /// Returns the file handle of the file range.
270    pub(crate) fn file_handle(&self) -> &FileHandle {
271        self.context.reader_builder.file_handle()
272    }
273}
274
275/// Context shared by ranges of the same parquet SST.
276pub(crate) struct FileRangeContext {
277    /// Row group reader builder for the file.
278    reader_builder: RowGroupReaderBuilder,
279    /// Base of the context.
280    base: RangeBase,
281}
282
283pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
284
285impl FileRangeContext {
286    /// Creates a new [FileRangeContext].
287    pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
288        Self {
289            reader_builder,
290            base,
291        }
292    }
293
294    /// Returns the path of the file to read.
295    pub(crate) fn file_path(&self) -> &str {
296        self.reader_builder.file_path()
297    }
298
299    /// Returns filters pushed down.
300    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
301        &self.base.filters
302    }
303
304    /// Returns the format helper.
305    pub(crate) fn read_format(&self) -> &ReadFormat {
306        &self.base.read_format
307    }
308
309    /// Returns the reader builder.
310    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
311        &self.reader_builder
312    }
313
314    /// Returns the helper to compat batches.
315    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
316        self.base.compat_batch.as_ref()
317    }
318
319    /// Sets the `CompatBatch` to the context.
320    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
321        self.base.compat_batch = compat;
322    }
323
324    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
325    /// Return the filtered batch. If the entire batch is filtered out, return None.
326    pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
327        self.base.precise_filter(input, skip_fields)
328    }
329
330    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
331    pub(crate) fn precise_filter_flat(
332        &self,
333        input: RecordBatch,
334        skip_fields: bool,
335    ) -> Result<Option<RecordBatch>> {
336        self.base.precise_filter_flat(input, skip_fields)
337    }
338
339    /// Determines whether to skip field filters based on PreFilterMode and row group delete status.
340    pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
341        match self.base.pre_filter_mode {
342            PreFilterMode::All => false,
343            PreFilterMode::SkipFields => true,
344            PreFilterMode::SkipFieldsOnDelete => {
345                // Check if this specific row group contains delete op
346                self.contains_delete(row_group_idx).unwrap_or(true)
347            }
348        }
349    }
350
351    //// Decodes parquet metadata and finds if row group contains delete op.
352    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
353        let metadata = self.reader_builder.parquet_metadata();
354        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
355    }
356}
357
358/// Mode to pre-filter columns in a range.
359#[derive(Debug, Clone, Copy)]
360pub enum PreFilterMode {
361    /// Filters all columns.
362    All,
363    /// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
364    /// Otherwise, skips filtering fields.
365    SkipFieldsOnDelete,
366    /// Always skip fields.
367    SkipFields,
368}
369
370/// Common fields for a range to read and filter batches.
371pub(crate) struct RangeBase {
372    /// Filters pushed down.
373    pub(crate) filters: Vec<SimpleFilterContext>,
374    /// Dynamic filter physical exprs.
375    pub(crate) dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
376    /// Helper to read the SST.
377    pub(crate) read_format: ReadFormat,
378    pub(crate) expected_metadata: Option<RegionMetadataRef>,
379    /// Schema used for pruning with dynamic filters.
380    pub(crate) prune_schema: Arc<Schema>,
381    /// Decoder for primary keys
382    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
383    /// Optional helper to compat batches.
384    pub(crate) compat_batch: Option<CompatBatch>,
385    /// Mode to pre-filter columns.
386    pub(crate) pre_filter_mode: PreFilterMode,
387}
388
389impl RangeBase {
390    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
391    /// Return the filtered batch. If the entire batch is filtered out, return None.
392    ///
393    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
394    ///
395    /// When a filter is referencing primary key column, this method will decode
396    /// the primary key and put it into the batch.
397    ///
398    /// # Arguments
399    /// * `input` - The batch to filter
400    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
401    pub(crate) fn precise_filter(
402        &self,
403        mut input: Batch,
404        skip_fields: bool,
405    ) -> Result<Option<Batch>> {
406        let mut mask = BooleanBuffer::new_set(input.num_rows());
407
408        // Run filter one by one and combine them result
409        // TODO(ruihang): run primary key filter first. It may short circuit other filters
410        for filter_ctx in &self.filters {
411            let filter = match filter_ctx.filter() {
412                MaybeFilter::Filter(f) => f,
413                // Column matches.
414                MaybeFilter::Matched => continue,
415                // Column doesn't match, filter the entire batch.
416                MaybeFilter::Pruned => return Ok(None),
417            };
418            let result = match filter_ctx.semantic_type() {
419                SemanticType::Tag => {
420                    let pk_values = if let Some(pk_values) = input.pk_values() {
421                        pk_values
422                    } else {
423                        input.set_pk_values(
424                            self.codec
425                                .decode(input.primary_key())
426                                .context(DecodeSnafu)?,
427                        );
428                        input.pk_values().unwrap()
429                    };
430                    let pk_value = match pk_values {
431                        CompositeValues::Dense(v) => {
432                            // Safety: this is a primary key
433                            let pk_index = self
434                                .read_format
435                                .metadata()
436                                .primary_key_index(filter_ctx.column_id())
437                                .unwrap();
438                            v[pk_index]
439                                .1
440                                .try_to_scalar_value(filter_ctx.data_type())
441                                .context(DataTypeMismatchSnafu)?
442                        }
443                        CompositeValues::Sparse(v) => {
444                            let v = v.get_or_null(filter_ctx.column_id());
445                            v.try_to_scalar_value(filter_ctx.data_type())
446                                .context(DataTypeMismatchSnafu)?
447                        }
448                    };
449                    if filter
450                        .evaluate_scalar(&pk_value)
451                        .context(RecordBatchSnafu)?
452                    {
453                        continue;
454                    } else {
455                        // PK not match means the entire batch is filtered out.
456                        return Ok(None);
457                    }
458                }
459                SemanticType::Field => {
460                    // Skip field filters if skip_fields is true
461                    if skip_fields {
462                        continue;
463                    }
464                    // Safety: Input is Batch so we are using primary key format.
465                    let Some(field_index) = self
466                        .read_format
467                        .as_primary_key()
468                        .unwrap()
469                        .field_index_by_id(filter_ctx.column_id())
470                    else {
471                        continue;
472                    };
473                    let field_col = &input.fields()[field_index].data;
474                    filter
475                        .evaluate_vector(field_col)
476                        .context(RecordBatchSnafu)?
477                }
478                SemanticType::Timestamp => filter
479                    .evaluate_vector(input.timestamps())
480                    .context(RecordBatchSnafu)?,
481            };
482
483            mask = mask.bitand(&result);
484        }
485
486        input.filter(&BooleanArray::from(mask).into())?;
487
488        Ok(Some(input))
489    }
490
491    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
492    ///
493    /// It assumes all necessary tags are already decoded from the primary key.
494    ///
495    /// # Arguments
496    /// * `input` - The RecordBatch to filter
497    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
498    pub(crate) fn precise_filter_flat(
499        &self,
500        input: RecordBatch,
501        skip_fields: bool,
502    ) -> Result<Option<RecordBatch>> {
503        let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
504
505        // If mask is None, the entire batch is filtered out
506        let Some(mask) = mask else {
507            return Ok(None);
508        };
509
510        let filtered_batch =
511            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
512                .context(ComputeArrowSnafu)?;
513
514        if filtered_batch.num_rows() > 0 {
515            Ok(Some(filtered_batch))
516        } else {
517            Ok(None)
518        }
519    }
520
521    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
522    ///
523    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
524    ///
525    /// # Arguments
526    /// * `input` - The RecordBatch to compute mask for
527    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
528    pub(crate) fn compute_filter_mask_flat(
529        &self,
530        input: &RecordBatch,
531        skip_fields: bool,
532    ) -> Result<Option<BooleanBuffer>> {
533        let mut mask = BooleanBuffer::new_set(input.num_rows());
534
535        let flat_format = self
536            .read_format
537            .as_flat()
538            .context(crate::error::UnexpectedSnafu {
539                reason: "Expected flat format for precise_filter_flat",
540            })?;
541
542        // Decodes primary keys once if we have any tag filters not in projection
543        let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
544        // Cache decoded tag arrays by column id to avoid redundant decoding
545        let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
546
547        // Run filter one by one and combine them result
548        for filter_ctx in &self.filters {
549            let filter = match filter_ctx.filter() {
550                MaybeFilter::Filter(f) => f,
551                // Column matches.
552                MaybeFilter::Matched => continue,
553                // Column doesn't match, filter the entire batch.
554                MaybeFilter::Pruned => return Ok(None),
555            };
556
557            // Skip field filters if skip_fields is true
558            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
559                continue;
560            }
561
562            // Get the column directly by its projected index
563            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
564            if let Some(idx) = column_idx {
565                let column = &input.columns()[idx];
566                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
567                mask = mask.bitand(&result);
568            } else if filter_ctx.semantic_type() == SemanticType::Tag {
569                // Column not found in projection, it may be a tag column.
570                // Decodes primary keys if not already decoded.
571                if decoded_pks.is_none() {
572                    decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
573                }
574
575                let metadata = flat_format.metadata();
576                let column_id = filter_ctx.column_id();
577
578                // Check cache first
579                let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
580                    cached_column.clone()
581                } else {
582                    // For dense encoding, we need pk_index. For sparse encoding, pk_index is None.
583                    let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
584                        None
585                    } else {
586                        metadata.primary_key_index(column_id)
587                    };
588                    let column_index = metadata.column_index_by_id(column_id);
589
590                    if let (Some(column_index), Some(decoded)) =
591                        (column_index, decoded_pks.as_ref())
592                    {
593                        let column_metadata = &metadata.column_metadatas[column_index];
594                        let tag_column = decoded.get_tag_column(
595                            column_id,
596                            pk_index,
597                            &column_metadata.column_schema.data_type,
598                        )?;
599                        // Cache the decoded tag column
600                        decoded_tag_cache.insert(column_id, tag_column.clone());
601                        tag_column
602                    } else {
603                        continue;
604                    }
605                };
606
607                let result = filter
608                    .evaluate_array(&tag_column)
609                    .context(RecordBatchSnafu)?;
610                mask = mask.bitand(&result);
611            } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
612                let time_index_pos = time_index_column_index(input.num_columns());
613                let column = &input.columns()[time_index_pos];
614                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
615                mask = mask.bitand(&result);
616            }
617            // Non-tag column not found in projection.
618        }
619
620        Ok(Some(mask))
621    }
622}