mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datatypes::arrow::array::{ArrayRef, BooleanArray};
25use datatypes::arrow::buffer::BooleanBuffer;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
28use parquet::arrow::arrow_reader::RowSelection;
29use parquet::file::metadata::ParquetMetaData;
30use snafu::{OptionExt, ResultExt};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::storage::{ColumnId, TimeSeriesRowSelector};
33
34use crate::error::{
35    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
36    Result, StatsNotPresentSnafu,
37};
38use crate::read::Batch;
39use crate::read::compat::CompatBatch;
40use crate::read::last_row::RowGroupLastRowCachedReader;
41use crate::read::prune::{FlatPruneReader, PruneReader};
42use crate::sst::file::FileHandle;
43use crate::sst::parquet::flat_format::{DecodedPrimaryKeys, decode_primary_keys};
44use crate::sst::parquet::format::ReadFormat;
45use crate::sst::parquet::reader::{
46    FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
47};
48
49/// Checks if a row group contains delete operations by examining the min value of op_type column.
50///
51/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
52/// or an error if the statistics are not present or cannot be decoded.
53pub(crate) fn row_group_contains_delete(
54    parquet_meta: &ParquetMetaData,
55    row_group_index: usize,
56    file_path: &str,
57) -> Result<bool> {
58    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
59
60    // safety: The last column of SST must be op_type
61    let column_metadata = &row_group_metadata.columns().last().unwrap();
62    let stats = column_metadata
63        .statistics()
64        .context(StatsNotPresentSnafu { file_path })?;
65    stats
66        .min_bytes_opt()
67        .context(StatsNotPresentSnafu { file_path })?
68        .try_into()
69        .map(i32::from_le_bytes)
70        .map(|min_op_type| min_op_type == OpType::Delete as i32)
71        .ok()
72        .context(DecodeStatsSnafu { file_path })
73}
74
75/// A range of a parquet SST. Now it is a row group.
76/// We can read different file ranges in parallel.
77#[derive(Clone)]
78pub struct FileRange {
79    /// Shared context.
80    context: FileRangeContextRef,
81    /// Index of the row group in the SST.
82    row_group_idx: usize,
83    /// Row selection for the row group. `None` means all rows.
84    row_selection: Option<RowSelection>,
85}
86
87impl FileRange {
88    /// Creates a new [FileRange].
89    pub(crate) fn new(
90        context: FileRangeContextRef,
91        row_group_idx: usize,
92        row_selection: Option<RowSelection>,
93    ) -> Self {
94        Self {
95            context,
96            row_group_idx,
97            row_selection,
98        }
99    }
100
101    /// Returns true if [FileRange] selects all rows in row group.
102    fn select_all(&self) -> bool {
103        let rows_in_group = self
104            .context
105            .reader_builder
106            .parquet_metadata()
107            .row_group(self.row_group_idx)
108            .num_rows();
109
110        let Some(row_selection) = &self.row_selection else {
111            return true;
112        };
113        row_selection.row_count() == rows_in_group as usize
114    }
115
116    /// Returns a reader to read the [FileRange].
117    pub(crate) async fn reader(
118        &self,
119        selector: Option<TimeSeriesRowSelector>,
120    ) -> Result<PruneReader> {
121        let parquet_reader = self
122            .context
123            .reader_builder
124            .build(self.row_group_idx, self.row_selection.clone())
125            .await?;
126
127        let use_last_row_reader = if selector
128            .map(|s| s == TimeSeriesRowSelector::LastRow)
129            .unwrap_or(false)
130        {
131            // Only use LastRowReader if row group does not contain DELETE
132            // and all rows are selected.
133            let put_only = !self
134                .context
135                .contains_delete(self.row_group_idx)
136                .inspect_err(|e| {
137                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
138                })
139                .unwrap_or(true);
140            put_only && self.select_all()
141        } else {
142            // No selector provided, use RowGroupReader
143            false
144        };
145
146        // Compute skip_fields once for this row group
147        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
148
149        let prune_reader = if use_last_row_reader {
150            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
151            let reader = RowGroupLastRowCachedReader::new(
152                self.file_handle().file_id().file_id(),
153                self.row_group_idx,
154                self.context.reader_builder.cache_strategy().clone(),
155                RowGroupReader::new(self.context.clone(), parquet_reader),
156            );
157            PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
158        } else {
159            // Row group contains DELETE, fallback to default reader.
160            PruneReader::new_with_row_group_reader(
161                self.context.clone(),
162                RowGroupReader::new(self.context.clone(), parquet_reader),
163                skip_fields,
164            )
165        };
166
167        Ok(prune_reader)
168    }
169
170    /// Creates a flat reader that returns RecordBatch.
171    pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
172        let parquet_reader = self
173            .context
174            .reader_builder
175            .build(self.row_group_idx, self.row_selection.clone())
176            .await?;
177
178        // Compute skip_fields once for this row group
179        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
180
181        let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
182        let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
183            self.context.clone(),
184            flat_row_group_reader,
185            skip_fields,
186        );
187
188        Ok(flat_prune_reader)
189    }
190
191    /// Returns the helper to compat batches.
192    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
193        self.context.compat_batch()
194    }
195
196    /// Returns the file handle of the file range.
197    pub(crate) fn file_handle(&self) -> &FileHandle {
198        self.context.reader_builder.file_handle()
199    }
200}
201
202/// Context shared by ranges of the same parquet SST.
203pub(crate) struct FileRangeContext {
204    /// Row group reader builder for the file.
205    reader_builder: RowGroupReaderBuilder,
206    /// Base of the context.
207    base: RangeBase,
208}
209
210pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
211
212impl FileRangeContext {
213    /// Creates a new [FileRangeContext].
214    pub(crate) fn new(
215        reader_builder: RowGroupReaderBuilder,
216        filters: Vec<SimpleFilterContext>,
217        read_format: ReadFormat,
218        codec: Arc<dyn PrimaryKeyCodec>,
219        pre_filter_mode: PreFilterMode,
220    ) -> Self {
221        Self {
222            reader_builder,
223            base: RangeBase {
224                filters,
225                read_format,
226                codec,
227                compat_batch: None,
228                pre_filter_mode,
229            },
230        }
231    }
232
233    /// Returns the path of the file to read.
234    pub(crate) fn file_path(&self) -> &str {
235        self.reader_builder.file_path()
236    }
237
238    /// Returns filters pushed down.
239    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
240        &self.base.filters
241    }
242
243    /// Returns the format helper.
244    pub(crate) fn read_format(&self) -> &ReadFormat {
245        &self.base.read_format
246    }
247
248    /// Returns the reader builder.
249    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
250        &self.reader_builder
251    }
252
253    /// Returns the helper to compat batches.
254    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
255        self.base.compat_batch.as_ref()
256    }
257
258    /// Sets the `CompatBatch` to the context.
259    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
260        self.base.compat_batch = compat;
261    }
262
263    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
264    /// Return the filtered batch. If the entire batch is filtered out, return None.
265    pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
266        self.base.precise_filter(input, skip_fields)
267    }
268
269    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
270    pub(crate) fn precise_filter_flat(
271        &self,
272        input: RecordBatch,
273        skip_fields: bool,
274    ) -> Result<Option<RecordBatch>> {
275        self.base.precise_filter_flat(input, skip_fields)
276    }
277
278    /// Determines whether to skip field filters based on PreFilterMode and row group delete status.
279    pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
280        match self.base.pre_filter_mode {
281            PreFilterMode::All => false,
282            PreFilterMode::SkipFields => true,
283            PreFilterMode::SkipFieldsOnDelete => {
284                // Check if this specific row group contains delete op
285                self.contains_delete(row_group_idx).unwrap_or(true)
286            }
287        }
288    }
289
290    //// Decodes parquet metadata and finds if row group contains delete op.
291    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
292        let metadata = self.reader_builder.parquet_metadata();
293        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
294    }
295}
296
297/// Mode to pre-filter columns in a range.
298#[derive(Debug, Clone, Copy)]
299pub enum PreFilterMode {
300    /// Filters all columns.
301    All,
302    /// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
303    /// Otherwise, skips filtering fields.
304    SkipFieldsOnDelete,
305    /// Always skip fields.
306    SkipFields,
307}
308
309/// Common fields for a range to read and filter batches.
310pub(crate) struct RangeBase {
311    /// Filters pushed down.
312    pub(crate) filters: Vec<SimpleFilterContext>,
313    /// Helper to read the SST.
314    pub(crate) read_format: ReadFormat,
315    /// Decoder for primary keys
316    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
317    /// Optional helper to compat batches.
318    pub(crate) compat_batch: Option<CompatBatch>,
319    /// Mode to pre-filter columns.
320    pub(crate) pre_filter_mode: PreFilterMode,
321}
322
323impl RangeBase {
324    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
325    /// Return the filtered batch. If the entire batch is filtered out, return None.
326    ///
327    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
328    ///
329    /// When a filter is referencing primary key column, this method will decode
330    /// the primary key and put it into the batch.
331    ///
332    /// # Arguments
333    /// * `input` - The batch to filter
334    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
335    pub(crate) fn precise_filter(
336        &self,
337        mut input: Batch,
338        skip_fields: bool,
339    ) -> Result<Option<Batch>> {
340        let mut mask = BooleanBuffer::new_set(input.num_rows());
341
342        // Run filter one by one and combine them result
343        // TODO(ruihang): run primary key filter first. It may short circuit other filters
344        for filter_ctx in &self.filters {
345            let filter = match filter_ctx.filter() {
346                MaybeFilter::Filter(f) => f,
347                // Column matches.
348                MaybeFilter::Matched => continue,
349                // Column doesn't match, filter the entire batch.
350                MaybeFilter::Pruned => return Ok(None),
351            };
352            let result = match filter_ctx.semantic_type() {
353                SemanticType::Tag => {
354                    let pk_values = if let Some(pk_values) = input.pk_values() {
355                        pk_values
356                    } else {
357                        input.set_pk_values(
358                            self.codec
359                                .decode(input.primary_key())
360                                .context(DecodeSnafu)?,
361                        );
362                        input.pk_values().unwrap()
363                    };
364                    let pk_value = match pk_values {
365                        CompositeValues::Dense(v) => {
366                            // Safety: this is a primary key
367                            let pk_index = self
368                                .read_format
369                                .metadata()
370                                .primary_key_index(filter_ctx.column_id())
371                                .unwrap();
372                            v[pk_index]
373                                .1
374                                .try_to_scalar_value(filter_ctx.data_type())
375                                .context(DataTypeMismatchSnafu)?
376                        }
377                        CompositeValues::Sparse(v) => {
378                            let v = v.get_or_null(filter_ctx.column_id());
379                            v.try_to_scalar_value(filter_ctx.data_type())
380                                .context(DataTypeMismatchSnafu)?
381                        }
382                    };
383                    if filter
384                        .evaluate_scalar(&pk_value)
385                        .context(RecordBatchSnafu)?
386                    {
387                        continue;
388                    } else {
389                        // PK not match means the entire batch is filtered out.
390                        return Ok(None);
391                    }
392                }
393                SemanticType::Field => {
394                    // Skip field filters if skip_fields is true
395                    if skip_fields {
396                        continue;
397                    }
398                    // Safety: Input is Batch so we are using primary key format.
399                    let Some(field_index) = self
400                        .read_format
401                        .as_primary_key()
402                        .unwrap()
403                        .field_index_by_id(filter_ctx.column_id())
404                    else {
405                        continue;
406                    };
407                    let field_col = &input.fields()[field_index].data;
408                    filter
409                        .evaluate_vector(field_col)
410                        .context(RecordBatchSnafu)?
411                }
412                SemanticType::Timestamp => filter
413                    .evaluate_vector(input.timestamps())
414                    .context(RecordBatchSnafu)?,
415            };
416
417            mask = mask.bitand(&result);
418        }
419
420        input.filter(&BooleanArray::from(mask).into())?;
421
422        Ok(Some(input))
423    }
424
425    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
426    ///
427    /// It assumes all necessary tags are already decoded from the primary key.
428    ///
429    /// # Arguments
430    /// * `input` - The RecordBatch to filter
431    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
432    pub(crate) fn precise_filter_flat(
433        &self,
434        input: RecordBatch,
435        skip_fields: bool,
436    ) -> Result<Option<RecordBatch>> {
437        let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
438
439        // If mask is None, the entire batch is filtered out
440        let Some(mask) = mask else {
441            return Ok(None);
442        };
443
444        let filtered_batch =
445            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
446                .context(ComputeArrowSnafu)?;
447
448        if filtered_batch.num_rows() > 0 {
449            Ok(Some(filtered_batch))
450        } else {
451            Ok(None)
452        }
453    }
454
455    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
456    ///
457    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
458    ///
459    /// # Arguments
460    /// * `input` - The RecordBatch to compute mask for
461    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
462    pub(crate) fn compute_filter_mask_flat(
463        &self,
464        input: &RecordBatch,
465        skip_fields: bool,
466    ) -> Result<Option<BooleanBuffer>> {
467        let mut mask = BooleanBuffer::new_set(input.num_rows());
468
469        let flat_format = self
470            .read_format
471            .as_flat()
472            .context(crate::error::UnexpectedSnafu {
473                reason: "Expected flat format for precise_filter_flat",
474            })?;
475
476        // Decodes primary keys once if we have any tag filters not in projection
477        let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
478        // Cache decoded tag arrays by column id to avoid redundant decoding
479        let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
480
481        // Run filter one by one and combine them result
482        for filter_ctx in &self.filters {
483            let filter = match filter_ctx.filter() {
484                MaybeFilter::Filter(f) => f,
485                // Column matches.
486                MaybeFilter::Matched => continue,
487                // Column doesn't match, filter the entire batch.
488                MaybeFilter::Pruned => return Ok(None),
489            };
490
491            // Skip field filters if skip_fields is true
492            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
493                continue;
494            }
495
496            // Get the column directly by its projected index
497            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
498            if let Some(idx) = column_idx {
499                let column = &input.columns()[idx];
500                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
501                mask = mask.bitand(&result);
502            } else if filter_ctx.semantic_type() == SemanticType::Tag {
503                // Column not found in projection, it may be a tag column.
504                // Decodes primary keys if not already decoded.
505                if decoded_pks.is_none() {
506                    decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
507                }
508
509                let metadata = flat_format.metadata();
510                let column_id = filter_ctx.column_id();
511
512                // Check cache first
513                let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
514                    cached_column.clone()
515                } else {
516                    // For dense encoding, we need pk_index. For sparse encoding, pk_index is None.
517                    let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
518                        None
519                    } else {
520                        metadata.primary_key_index(column_id)
521                    };
522                    let column_index = metadata.column_index_by_id(column_id);
523
524                    if let (Some(column_index), Some(decoded)) =
525                        (column_index, decoded_pks.as_ref())
526                    {
527                        let column_metadata = &metadata.column_metadatas[column_index];
528                        let tag_column = decoded.get_tag_column(
529                            column_id,
530                            pk_index,
531                            &column_metadata.column_schema.data_type,
532                        )?;
533                        // Cache the decoded tag column
534                        decoded_tag_cache.insert(column_id, tag_column.clone());
535                        tag_column
536                    } else {
537                        continue;
538                    }
539                };
540
541                let result = filter
542                    .evaluate_array(&tag_column)
543                    .context(RecordBatchSnafu)?;
544                mask = mask.bitand(&result);
545            }
546            // Non-tag column not found in projection.
547        }
548
549        Ok(Some(mask))
550    }
551}