mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::ops::BitAnd;
19use std::sync::Arc;
20
21use api::v1::{OpType, SemanticType};
22use common_telemetry::error;
23use datatypes::arrow::array::BooleanArray;
24use datatypes::arrow::buffer::BooleanBuffer;
25use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
26use parquet::arrow::arrow_reader::RowSelection;
27use snafu::{OptionExt, ResultExt};
28use store_api::storage::TimeSeriesRowSelector;
29
30use crate::error::{
31    DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result,
32    StatsNotPresentSnafu,
33};
34use crate::read::compat::CompatBatch;
35use crate::read::last_row::RowGroupLastRowCachedReader;
36use crate::read::prune::PruneReader;
37use crate::read::Batch;
38use crate::sst::file::FileHandle;
39use crate::sst::parquet::format::ReadFormat;
40use crate::sst::parquet::reader::{
41    MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
42};
43
44/// A range of a parquet SST. Now it is a row group.
45/// We can read different file ranges in parallel.
46#[derive(Clone)]
47pub struct FileRange {
48    /// Shared context.
49    context: FileRangeContextRef,
50    /// Index of the row group in the SST.
51    row_group_idx: usize,
52    /// Row selection for the row group. `None` means all rows.
53    row_selection: Option<RowSelection>,
54}
55
56impl FileRange {
57    /// Creates a new [FileRange].
58    pub(crate) fn new(
59        context: FileRangeContextRef,
60        row_group_idx: usize,
61        row_selection: Option<RowSelection>,
62    ) -> Self {
63        Self {
64            context,
65            row_group_idx,
66            row_selection,
67        }
68    }
69
70    /// Returns true if [FileRange] selects all rows in row group.
71    fn select_all(&self) -> bool {
72        let rows_in_group = self
73            .context
74            .reader_builder
75            .parquet_metadata()
76            .row_group(self.row_group_idx)
77            .num_rows();
78
79        let Some(row_selection) = &self.row_selection else {
80            return true;
81        };
82        row_selection.row_count() == rows_in_group as usize
83    }
84
85    /// Returns a reader to read the [FileRange].
86    pub(crate) async fn reader(
87        &self,
88        selector: Option<TimeSeriesRowSelector>,
89    ) -> Result<PruneReader> {
90        let parquet_reader = self
91            .context
92            .reader_builder
93            .build(self.row_group_idx, self.row_selection.clone())
94            .await?;
95
96        let use_last_row_reader = if selector
97            .map(|s| s == TimeSeriesRowSelector::LastRow)
98            .unwrap_or(false)
99        {
100            // Only use LastRowReader if row group does not contain DELETE
101            // and all rows are selected.
102            let put_only = !self
103                .context
104                .contains_delete(self.row_group_idx)
105                .inspect_err(|e| {
106                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
107                })
108                .unwrap_or(true);
109            put_only && self.select_all()
110        } else {
111            // No selector provided, use RowGroupReader
112            false
113        };
114
115        let prune_reader = if use_last_row_reader {
116            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
117            let reader = RowGroupLastRowCachedReader::new(
118                self.file_handle().file_id(),
119                self.row_group_idx,
120                self.context.reader_builder.cache_strategy().clone(),
121                RowGroupReader::new(self.context.clone(), parquet_reader),
122            );
123            PruneReader::new_with_last_row_reader(self.context.clone(), reader)
124        } else {
125            // Row group contains DELETE, fallback to default reader.
126            PruneReader::new_with_row_group_reader(
127                self.context.clone(),
128                RowGroupReader::new(self.context.clone(), parquet_reader),
129            )
130        };
131
132        Ok(prune_reader)
133    }
134
135    /// Returns the helper to compat batches.
136    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
137        self.context.compat_batch()
138    }
139
140    /// Returns the file handle of the file range.
141    pub(crate) fn file_handle(&self) -> &FileHandle {
142        self.context.reader_builder.file_handle()
143    }
144}
145
146/// Context shared by ranges of the same parquet SST.
147pub(crate) struct FileRangeContext {
148    /// Row group reader builder for the file.
149    reader_builder: RowGroupReaderBuilder,
150    /// Base of the context.
151    base: RangeBase,
152}
153
154pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
155
156impl FileRangeContext {
157    /// Creates a new [FileRangeContext].
158    pub(crate) fn new(
159        reader_builder: RowGroupReaderBuilder,
160        filters: Vec<SimpleFilterContext>,
161        read_format: ReadFormat,
162        codec: Arc<dyn PrimaryKeyCodec>,
163    ) -> Self {
164        Self {
165            reader_builder,
166            base: RangeBase {
167                filters,
168                read_format,
169                codec,
170                compat_batch: None,
171            },
172        }
173    }
174
175    /// Returns the path of the file to read.
176    pub(crate) fn file_path(&self) -> &str {
177        self.reader_builder.file_path()
178    }
179
180    /// Returns filters pushed down.
181    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
182        &self.base.filters
183    }
184
185    /// Returns the format helper.
186    pub(crate) fn read_format(&self) -> &ReadFormat {
187        &self.base.read_format
188    }
189
190    /// Returns the reader builder.
191    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
192        &self.reader_builder
193    }
194
195    /// Returns the helper to compat batches.
196    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
197        self.base.compat_batch.as_ref()
198    }
199
200    /// Sets the `CompatBatch` to the context.
201    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
202        self.base.compat_batch = compat;
203    }
204
205    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
206    /// Return the filtered batch. If the entire batch is filtered out, return None.
207    pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
208        self.base.precise_filter(input)
209    }
210
211    //// Decodes parquet metadata and finds if row group contains delete op.
212    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
213        let metadata = self.reader_builder.parquet_metadata();
214        let row_group_metadata = &metadata.row_groups()[row_group_index];
215
216        // safety: The last column of SST must be op_type
217        let column_metadata = &row_group_metadata.columns().last().unwrap();
218        let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
219            file_path: self.reader_builder.file_path(),
220        })?;
221        stats
222            .min_bytes_opt()
223            .context(StatsNotPresentSnafu {
224                file_path: self.reader_builder.file_path(),
225            })?
226            .try_into()
227            .map(i32::from_le_bytes)
228            .map(|min_op_type| min_op_type == OpType::Delete as i32)
229            .ok()
230            .context(DecodeStatsSnafu {
231                file_path: self.reader_builder.file_path(),
232            })
233    }
234}
235
236/// Common fields for a range to read and filter batches.
237pub(crate) struct RangeBase {
238    /// Filters pushed down.
239    pub(crate) filters: Vec<SimpleFilterContext>,
240    /// Helper to read the SST.
241    pub(crate) read_format: ReadFormat,
242    /// Decoder for primary keys
243    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
244    /// Optional helper to compat batches.
245    pub(crate) compat_batch: Option<CompatBatch>,
246}
247
248impl RangeBase {
249    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
250    /// Return the filtered batch. If the entire batch is filtered out, return None.
251    ///
252    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
253    ///
254    /// When a filter is referencing primary key column, this method will decode
255    /// the primary key and put it into the batch.
256    pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
257        let mut mask = BooleanBuffer::new_set(input.num_rows());
258
259        // Run filter one by one and combine them result
260        // TODO(ruihang): run primary key filter first. It may short circuit other filters
261        for filter_ctx in &self.filters {
262            let filter = match filter_ctx.filter() {
263                MaybeFilter::Filter(f) => f,
264                // Column matches.
265                MaybeFilter::Matched => continue,
266                // Column doesn't match, filter the entire batch.
267                MaybeFilter::Pruned => return Ok(None),
268            };
269            let result = match filter_ctx.semantic_type() {
270                SemanticType::Tag => {
271                    let pk_values = if let Some(pk_values) = input.pk_values() {
272                        pk_values
273                    } else {
274                        input.set_pk_values(
275                            self.codec
276                                .decode(input.primary_key())
277                                .context(DecodeSnafu)?,
278                        );
279                        input.pk_values().unwrap()
280                    };
281                    let pk_value = match pk_values {
282                        CompositeValues::Dense(v) => {
283                            // Safety: this is a primary key
284                            let pk_index = self
285                                .read_format
286                                .metadata()
287                                .primary_key_index(filter_ctx.column_id())
288                                .unwrap();
289                            v[pk_index]
290                                .1
291                                .try_to_scalar_value(filter_ctx.data_type())
292                                .context(DataTypeMismatchSnafu)?
293                        }
294                        CompositeValues::Sparse(v) => {
295                            let v = v.get_or_null(filter_ctx.column_id());
296                            v.try_to_scalar_value(filter_ctx.data_type())
297                                .context(DataTypeMismatchSnafu)?
298                        }
299                    };
300                    if filter
301                        .evaluate_scalar(&pk_value)
302                        .context(RecordBatchSnafu)?
303                    {
304                        continue;
305                    } else {
306                        // PK not match means the entire batch is filtered out.
307                        return Ok(None);
308                    }
309                }
310                SemanticType::Field => {
311                    let Some(field_index) =
312                        self.read_format.field_index_by_id(filter_ctx.column_id())
313                    else {
314                        continue;
315                    };
316                    let field_col = &input.fields()[field_index].data;
317                    filter
318                        .evaluate_vector(field_col)
319                        .context(RecordBatchSnafu)?
320                }
321                SemanticType::Timestamp => filter
322                    .evaluate_vector(input.timestamps())
323                    .context(RecordBatchSnafu)?,
324            };
325
326            mask = mask.bitand(&result);
327        }
328
329        input.filter(&BooleanArray::from(mask).into())?;
330
331        Ok(Some(input))
332    }
333}