mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datatypes::arrow::array::{ArrayRef, BooleanArray};
25use datatypes::arrow::buffer::BooleanBuffer;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
28use parquet::arrow::arrow_reader::RowSelection;
29use parquet::file::metadata::ParquetMetaData;
30use snafu::{OptionExt, ResultExt};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::storage::{ColumnId, TimeSeriesRowSelector};
33
34use crate::error::{
35    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
36    Result, StatsNotPresentSnafu,
37};
38use crate::read::Batch;
39use crate::read::compat::CompatBatch;
40use crate::read::last_row::RowGroupLastRowCachedReader;
41use crate::read::prune::{FlatPruneReader, PruneReader};
42use crate::sst::file::FileHandle;
43use crate::sst::parquet::flat_format::{DecodedPrimaryKeys, decode_primary_keys};
44use crate::sst::parquet::format::ReadFormat;
45use crate::sst::parquet::reader::{
46    FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
47};
48use crate::sst::parquet::row_group::ParquetFetchMetrics;
49
50/// Checks if a row group contains delete operations by examining the min value of op_type column.
51///
52/// Returns `Ok(true)` if the row group contains delete operations, `Ok(false)` if it doesn't,
53/// or an error if the statistics are not present or cannot be decoded.
54pub(crate) fn row_group_contains_delete(
55    parquet_meta: &ParquetMetaData,
56    row_group_index: usize,
57    file_path: &str,
58) -> Result<bool> {
59    let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
60
61    // safety: The last column of SST must be op_type
62    let column_metadata = &row_group_metadata.columns().last().unwrap();
63    let stats = column_metadata
64        .statistics()
65        .context(StatsNotPresentSnafu { file_path })?;
66    stats
67        .min_bytes_opt()
68        .context(StatsNotPresentSnafu { file_path })?
69        .try_into()
70        .map(i32::from_le_bytes)
71        .map(|min_op_type| min_op_type == OpType::Delete as i32)
72        .ok()
73        .context(DecodeStatsSnafu { file_path })
74}
75
76/// A range of a parquet SST. Now it is a row group.
77/// We can read different file ranges in parallel.
78#[derive(Clone)]
79pub struct FileRange {
80    /// Shared context.
81    context: FileRangeContextRef,
82    /// Index of the row group in the SST.
83    row_group_idx: usize,
84    /// Row selection for the row group. `None` means all rows.
85    row_selection: Option<RowSelection>,
86}
87
88impl FileRange {
89    /// Creates a new [FileRange].
90    pub(crate) fn new(
91        context: FileRangeContextRef,
92        row_group_idx: usize,
93        row_selection: Option<RowSelection>,
94    ) -> Self {
95        Self {
96            context,
97            row_group_idx,
98            row_selection,
99        }
100    }
101
102    /// Returns true if [FileRange] selects all rows in row group.
103    fn select_all(&self) -> bool {
104        let rows_in_group = self
105            .context
106            .reader_builder
107            .parquet_metadata()
108            .row_group(self.row_group_idx)
109            .num_rows();
110
111        let Some(row_selection) = &self.row_selection else {
112            return true;
113        };
114        row_selection.row_count() == rows_in_group as usize
115    }
116
117    /// Returns a reader to read the [FileRange].
118    pub(crate) async fn reader(
119        &self,
120        selector: Option<TimeSeriesRowSelector>,
121        fetch_metrics: Option<&ParquetFetchMetrics>,
122    ) -> Result<PruneReader> {
123        let parquet_reader = self
124            .context
125            .reader_builder
126            .build(
127                self.row_group_idx,
128                self.row_selection.clone(),
129                fetch_metrics,
130            )
131            .await?;
132
133        let use_last_row_reader = if selector
134            .map(|s| s == TimeSeriesRowSelector::LastRow)
135            .unwrap_or(false)
136        {
137            // Only use LastRowReader if row group does not contain DELETE
138            // and all rows are selected.
139            let put_only = !self
140                .context
141                .contains_delete(self.row_group_idx)
142                .inspect_err(|e| {
143                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
144                })
145                .unwrap_or(true);
146            put_only && self.select_all()
147        } else {
148            // No selector provided, use RowGroupReader
149            false
150        };
151
152        // Compute skip_fields once for this row group
153        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
154
155        let prune_reader = if use_last_row_reader {
156            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
157            let reader = RowGroupLastRowCachedReader::new(
158                self.file_handle().file_id().file_id(),
159                self.row_group_idx,
160                self.context.reader_builder.cache_strategy().clone(),
161                RowGroupReader::new(self.context.clone(), parquet_reader),
162            );
163            PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
164        } else {
165            // Row group contains DELETE, fallback to default reader.
166            PruneReader::new_with_row_group_reader(
167                self.context.clone(),
168                RowGroupReader::new(self.context.clone(), parquet_reader),
169                skip_fields,
170            )
171        };
172
173        Ok(prune_reader)
174    }
175
176    /// Creates a flat reader that returns RecordBatch.
177    pub(crate) async fn flat_reader(
178        &self,
179        fetch_metrics: Option<&ParquetFetchMetrics>,
180    ) -> Result<FlatPruneReader> {
181        let parquet_reader = self
182            .context
183            .reader_builder
184            .build(
185                self.row_group_idx,
186                self.row_selection.clone(),
187                fetch_metrics,
188            )
189            .await?;
190
191        // Compute skip_fields once for this row group
192        let skip_fields = self.context.should_skip_fields(self.row_group_idx);
193
194        let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
195        let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
196            self.context.clone(),
197            flat_row_group_reader,
198            skip_fields,
199        );
200
201        Ok(flat_prune_reader)
202    }
203
204    /// Returns the helper to compat batches.
205    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
206        self.context.compat_batch()
207    }
208
209    /// Returns the file handle of the file range.
210    pub(crate) fn file_handle(&self) -> &FileHandle {
211        self.context.reader_builder.file_handle()
212    }
213}
214
215/// Context shared by ranges of the same parquet SST.
216pub(crate) struct FileRangeContext {
217    /// Row group reader builder for the file.
218    reader_builder: RowGroupReaderBuilder,
219    /// Base of the context.
220    base: RangeBase,
221}
222
223pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
224
225impl FileRangeContext {
226    /// Creates a new [FileRangeContext].
227    pub(crate) fn new(
228        reader_builder: RowGroupReaderBuilder,
229        filters: Vec<SimpleFilterContext>,
230        read_format: ReadFormat,
231        codec: Arc<dyn PrimaryKeyCodec>,
232        pre_filter_mode: PreFilterMode,
233    ) -> Self {
234        Self {
235            reader_builder,
236            base: RangeBase {
237                filters,
238                read_format,
239                codec,
240                compat_batch: None,
241                pre_filter_mode,
242            },
243        }
244    }
245
246    /// Returns the path of the file to read.
247    pub(crate) fn file_path(&self) -> &str {
248        self.reader_builder.file_path()
249    }
250
251    /// Returns filters pushed down.
252    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
253        &self.base.filters
254    }
255
256    /// Returns the format helper.
257    pub(crate) fn read_format(&self) -> &ReadFormat {
258        &self.base.read_format
259    }
260
261    /// Returns the reader builder.
262    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
263        &self.reader_builder
264    }
265
266    /// Returns the helper to compat batches.
267    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
268        self.base.compat_batch.as_ref()
269    }
270
271    /// Sets the `CompatBatch` to the context.
272    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
273        self.base.compat_batch = compat;
274    }
275
276    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
277    /// Return the filtered batch. If the entire batch is filtered out, return None.
278    pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
279        self.base.precise_filter(input, skip_fields)
280    }
281
282    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
283    pub(crate) fn precise_filter_flat(
284        &self,
285        input: RecordBatch,
286        skip_fields: bool,
287    ) -> Result<Option<RecordBatch>> {
288        self.base.precise_filter_flat(input, skip_fields)
289    }
290
291    /// Determines whether to skip field filters based on PreFilterMode and row group delete status.
292    pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
293        match self.base.pre_filter_mode {
294            PreFilterMode::All => false,
295            PreFilterMode::SkipFields => true,
296            PreFilterMode::SkipFieldsOnDelete => {
297                // Check if this specific row group contains delete op
298                self.contains_delete(row_group_idx).unwrap_or(true)
299            }
300        }
301    }
302
303    //// Decodes parquet metadata and finds if row group contains delete op.
304    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
305        let metadata = self.reader_builder.parquet_metadata();
306        row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
307    }
308}
309
310/// Mode to pre-filter columns in a range.
311#[derive(Debug, Clone, Copy)]
312pub enum PreFilterMode {
313    /// Filters all columns.
314    All,
315    /// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
316    /// Otherwise, skips filtering fields.
317    SkipFieldsOnDelete,
318    /// Always skip fields.
319    SkipFields,
320}
321
322/// Common fields for a range to read and filter batches.
323pub(crate) struct RangeBase {
324    /// Filters pushed down.
325    pub(crate) filters: Vec<SimpleFilterContext>,
326    /// Helper to read the SST.
327    pub(crate) read_format: ReadFormat,
328    /// Decoder for primary keys
329    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
330    /// Optional helper to compat batches.
331    pub(crate) compat_batch: Option<CompatBatch>,
332    /// Mode to pre-filter columns.
333    pub(crate) pre_filter_mode: PreFilterMode,
334}
335
336impl RangeBase {
337    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
338    /// Return the filtered batch. If the entire batch is filtered out, return None.
339    ///
340    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
341    ///
342    /// When a filter is referencing primary key column, this method will decode
343    /// the primary key and put it into the batch.
344    ///
345    /// # Arguments
346    /// * `input` - The batch to filter
347    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
348    pub(crate) fn precise_filter(
349        &self,
350        mut input: Batch,
351        skip_fields: bool,
352    ) -> Result<Option<Batch>> {
353        let mut mask = BooleanBuffer::new_set(input.num_rows());
354
355        // Run filter one by one and combine them result
356        // TODO(ruihang): run primary key filter first. It may short circuit other filters
357        for filter_ctx in &self.filters {
358            let filter = match filter_ctx.filter() {
359                MaybeFilter::Filter(f) => f,
360                // Column matches.
361                MaybeFilter::Matched => continue,
362                // Column doesn't match, filter the entire batch.
363                MaybeFilter::Pruned => return Ok(None),
364            };
365            let result = match filter_ctx.semantic_type() {
366                SemanticType::Tag => {
367                    let pk_values = if let Some(pk_values) = input.pk_values() {
368                        pk_values
369                    } else {
370                        input.set_pk_values(
371                            self.codec
372                                .decode(input.primary_key())
373                                .context(DecodeSnafu)?,
374                        );
375                        input.pk_values().unwrap()
376                    };
377                    let pk_value = match pk_values {
378                        CompositeValues::Dense(v) => {
379                            // Safety: this is a primary key
380                            let pk_index = self
381                                .read_format
382                                .metadata()
383                                .primary_key_index(filter_ctx.column_id())
384                                .unwrap();
385                            v[pk_index]
386                                .1
387                                .try_to_scalar_value(filter_ctx.data_type())
388                                .context(DataTypeMismatchSnafu)?
389                        }
390                        CompositeValues::Sparse(v) => {
391                            let v = v.get_or_null(filter_ctx.column_id());
392                            v.try_to_scalar_value(filter_ctx.data_type())
393                                .context(DataTypeMismatchSnafu)?
394                        }
395                    };
396                    if filter
397                        .evaluate_scalar(&pk_value)
398                        .context(RecordBatchSnafu)?
399                    {
400                        continue;
401                    } else {
402                        // PK not match means the entire batch is filtered out.
403                        return Ok(None);
404                    }
405                }
406                SemanticType::Field => {
407                    // Skip field filters if skip_fields is true
408                    if skip_fields {
409                        continue;
410                    }
411                    // Safety: Input is Batch so we are using primary key format.
412                    let Some(field_index) = self
413                        .read_format
414                        .as_primary_key()
415                        .unwrap()
416                        .field_index_by_id(filter_ctx.column_id())
417                    else {
418                        continue;
419                    };
420                    let field_col = &input.fields()[field_index].data;
421                    filter
422                        .evaluate_vector(field_col)
423                        .context(RecordBatchSnafu)?
424                }
425                SemanticType::Timestamp => filter
426                    .evaluate_vector(input.timestamps())
427                    .context(RecordBatchSnafu)?,
428            };
429
430            mask = mask.bitand(&result);
431        }
432
433        input.filter(&BooleanArray::from(mask).into())?;
434
435        Ok(Some(input))
436    }
437
438    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
439    ///
440    /// It assumes all necessary tags are already decoded from the primary key.
441    ///
442    /// # Arguments
443    /// * `input` - The RecordBatch to filter
444    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
445    pub(crate) fn precise_filter_flat(
446        &self,
447        input: RecordBatch,
448        skip_fields: bool,
449    ) -> Result<Option<RecordBatch>> {
450        let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
451
452        // If mask is None, the entire batch is filtered out
453        let Some(mask) = mask else {
454            return Ok(None);
455        };
456
457        let filtered_batch =
458            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
459                .context(ComputeArrowSnafu)?;
460
461        if filtered_batch.num_rows() > 0 {
462            Ok(Some(filtered_batch))
463        } else {
464            Ok(None)
465        }
466    }
467
468    /// Computes the filter mask for the input RecordBatch based on pushed down predicates.
469    ///
470    /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
471    ///
472    /// # Arguments
473    /// * `input` - The RecordBatch to compute mask for
474    /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
475    pub(crate) fn compute_filter_mask_flat(
476        &self,
477        input: &RecordBatch,
478        skip_fields: bool,
479    ) -> Result<Option<BooleanBuffer>> {
480        let mut mask = BooleanBuffer::new_set(input.num_rows());
481
482        let flat_format = self
483            .read_format
484            .as_flat()
485            .context(crate::error::UnexpectedSnafu {
486                reason: "Expected flat format for precise_filter_flat",
487            })?;
488
489        // Decodes primary keys once if we have any tag filters not in projection
490        let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
491        // Cache decoded tag arrays by column id to avoid redundant decoding
492        let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
493
494        // Run filter one by one and combine them result
495        for filter_ctx in &self.filters {
496            let filter = match filter_ctx.filter() {
497                MaybeFilter::Filter(f) => f,
498                // Column matches.
499                MaybeFilter::Matched => continue,
500                // Column doesn't match, filter the entire batch.
501                MaybeFilter::Pruned => return Ok(None),
502            };
503
504            // Skip field filters if skip_fields is true
505            if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
506                continue;
507            }
508
509            // Get the column directly by its projected index
510            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
511            if let Some(idx) = column_idx {
512                let column = &input.columns()[idx];
513                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
514                mask = mask.bitand(&result);
515            } else if filter_ctx.semantic_type() == SemanticType::Tag {
516                // Column not found in projection, it may be a tag column.
517                // Decodes primary keys if not already decoded.
518                if decoded_pks.is_none() {
519                    decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
520                }
521
522                let metadata = flat_format.metadata();
523                let column_id = filter_ctx.column_id();
524
525                // Check cache first
526                let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
527                    cached_column.clone()
528                } else {
529                    // For dense encoding, we need pk_index. For sparse encoding, pk_index is None.
530                    let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
531                        None
532                    } else {
533                        metadata.primary_key_index(column_id)
534                    };
535                    let column_index = metadata.column_index_by_id(column_id);
536
537                    if let (Some(column_index), Some(decoded)) =
538                        (column_index, decoded_pks.as_ref())
539                    {
540                        let column_metadata = &metadata.column_metadatas[column_index];
541                        let tag_column = decoded.get_tag_column(
542                            column_id,
543                            pk_index,
544                            &column_metadata.column_schema.data_type,
545                        )?;
546                        // Cache the decoded tag column
547                        decoded_tag_cache.insert(column_id, tag_column.clone());
548                        tag_column
549                    } else {
550                        continue;
551                    }
552                };
553
554                let result = filter
555                    .evaluate_array(&tag_column)
556                    .context(RecordBatchSnafu)?;
557                mask = mask.bitand(&result);
558            }
559            // Non-tag column not found in projection.
560        }
561
562        Ok(Some(mask))
563    }
564}