mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::ops::BitAnd;
19use std::sync::Arc;
20
21use api::v1::{OpType, SemanticType};
22use common_telemetry::error;
23use datatypes::arrow::array::BooleanArray;
24use datatypes::arrow::buffer::BooleanBuffer;
25use datatypes::arrow::record_batch::RecordBatch;
26use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
27use parquet::arrow::arrow_reader::RowSelection;
28use snafu::{OptionExt, ResultExt};
29use store_api::storage::TimeSeriesRowSelector;
30
31use crate::error::{
32    ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
33    Result, StatsNotPresentSnafu,
34};
35use crate::read::compat::CompatBatch;
36use crate::read::last_row::RowGroupLastRowCachedReader;
37use crate::read::prune::{FlatPruneReader, PruneReader};
38use crate::read::Batch;
39use crate::sst::file::FileHandle;
40use crate::sst::parquet::format::ReadFormat;
41use crate::sst::parquet::reader::{
42    FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
43};
44
45/// A range of a parquet SST. Now it is a row group.
46/// We can read different file ranges in parallel.
47#[derive(Clone)]
48pub struct FileRange {
49    /// Shared context.
50    context: FileRangeContextRef,
51    /// Index of the row group in the SST.
52    row_group_idx: usize,
53    /// Row selection for the row group. `None` means all rows.
54    row_selection: Option<RowSelection>,
55}
56
57impl FileRange {
58    /// Creates a new [FileRange].
59    pub(crate) fn new(
60        context: FileRangeContextRef,
61        row_group_idx: usize,
62        row_selection: Option<RowSelection>,
63    ) -> Self {
64        Self {
65            context,
66            row_group_idx,
67            row_selection,
68        }
69    }
70
71    /// Returns true if [FileRange] selects all rows in row group.
72    fn select_all(&self) -> bool {
73        let rows_in_group = self
74            .context
75            .reader_builder
76            .parquet_metadata()
77            .row_group(self.row_group_idx)
78            .num_rows();
79
80        let Some(row_selection) = &self.row_selection else {
81            return true;
82        };
83        row_selection.row_count() == rows_in_group as usize
84    }
85
86    /// Returns a reader to read the [FileRange].
87    pub(crate) async fn reader(
88        &self,
89        selector: Option<TimeSeriesRowSelector>,
90    ) -> Result<PruneReader> {
91        let parquet_reader = self
92            .context
93            .reader_builder
94            .build(self.row_group_idx, self.row_selection.clone())
95            .await?;
96
97        let use_last_row_reader = if selector
98            .map(|s| s == TimeSeriesRowSelector::LastRow)
99            .unwrap_or(false)
100        {
101            // Only use LastRowReader if row group does not contain DELETE
102            // and all rows are selected.
103            let put_only = !self
104                .context
105                .contains_delete(self.row_group_idx)
106                .inspect_err(|e| {
107                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
108                })
109                .unwrap_or(true);
110            put_only && self.select_all()
111        } else {
112            // No selector provided, use RowGroupReader
113            false
114        };
115
116        let prune_reader = if use_last_row_reader {
117            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
118            let reader = RowGroupLastRowCachedReader::new(
119                self.file_handle().file_id().file_id(),
120                self.row_group_idx,
121                self.context.reader_builder.cache_strategy().clone(),
122                RowGroupReader::new(self.context.clone(), parquet_reader),
123            );
124            PruneReader::new_with_last_row_reader(self.context.clone(), reader)
125        } else {
126            // Row group contains DELETE, fallback to default reader.
127            PruneReader::new_with_row_group_reader(
128                self.context.clone(),
129                RowGroupReader::new(self.context.clone(), parquet_reader),
130            )
131        };
132
133        Ok(prune_reader)
134    }
135
136    /// Creates a flat reader that returns RecordBatch.
137    pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
138        let parquet_reader = self
139            .context
140            .reader_builder
141            .build(self.row_group_idx, self.row_selection.clone())
142            .await?;
143
144        let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
145        let flat_prune_reader =
146            FlatPruneReader::new_with_row_group_reader(self.context.clone(), flat_row_group_reader);
147
148        Ok(flat_prune_reader)
149    }
150
151    /// Returns the helper to compat batches.
152    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
153        self.context.compat_batch()
154    }
155
156    /// Returns the file handle of the file range.
157    pub(crate) fn file_handle(&self) -> &FileHandle {
158        self.context.reader_builder.file_handle()
159    }
160}
161
162/// Context shared by ranges of the same parquet SST.
163pub(crate) struct FileRangeContext {
164    /// Row group reader builder for the file.
165    reader_builder: RowGroupReaderBuilder,
166    /// Base of the context.
167    base: RangeBase,
168}
169
170pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
171
172impl FileRangeContext {
173    /// Creates a new [FileRangeContext].
174    pub(crate) fn new(
175        reader_builder: RowGroupReaderBuilder,
176        filters: Vec<SimpleFilterContext>,
177        read_format: ReadFormat,
178        codec: Arc<dyn PrimaryKeyCodec>,
179    ) -> Self {
180        Self {
181            reader_builder,
182            base: RangeBase {
183                filters,
184                read_format,
185                codec,
186                compat_batch: None,
187            },
188        }
189    }
190
191    /// Returns the path of the file to read.
192    pub(crate) fn file_path(&self) -> &str {
193        self.reader_builder.file_path()
194    }
195
196    /// Returns filters pushed down.
197    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
198        &self.base.filters
199    }
200
201    /// Returns the format helper.
202    pub(crate) fn read_format(&self) -> &ReadFormat {
203        &self.base.read_format
204    }
205
206    /// Returns the reader builder.
207    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
208        &self.reader_builder
209    }
210
211    /// Returns the helper to compat batches.
212    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
213        self.base.compat_batch.as_ref()
214    }
215
216    /// Sets the `CompatBatch` to the context.
217    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
218        self.base.compat_batch = compat;
219    }
220
221    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
222    /// Return the filtered batch. If the entire batch is filtered out, return None.
223    pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
224        self.base.precise_filter(input)
225    }
226
227    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
228    pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
229        self.base.precise_filter_flat(input)
230    }
231
232    //// Decodes parquet metadata and finds if row group contains delete op.
233    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
234        let metadata = self.reader_builder.parquet_metadata();
235        let row_group_metadata = &metadata.row_groups()[row_group_index];
236
237        // safety: The last column of SST must be op_type
238        let column_metadata = &row_group_metadata.columns().last().unwrap();
239        let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
240            file_path: self.reader_builder.file_path(),
241        })?;
242        stats
243            .min_bytes_opt()
244            .context(StatsNotPresentSnafu {
245                file_path: self.reader_builder.file_path(),
246            })?
247            .try_into()
248            .map(i32::from_le_bytes)
249            .map(|min_op_type| min_op_type == OpType::Delete as i32)
250            .ok()
251            .context(DecodeStatsSnafu {
252                file_path: self.reader_builder.file_path(),
253            })
254    }
255}
256
257/// Common fields for a range to read and filter batches.
258pub(crate) struct RangeBase {
259    /// Filters pushed down.
260    pub(crate) filters: Vec<SimpleFilterContext>,
261    /// Helper to read the SST.
262    pub(crate) read_format: ReadFormat,
263    /// Decoder for primary keys
264    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
265    /// Optional helper to compat batches.
266    pub(crate) compat_batch: Option<CompatBatch>,
267}
268
269impl RangeBase {
270    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
271    /// Return the filtered batch. If the entire batch is filtered out, return None.
272    ///
273    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
274    ///
275    /// When a filter is referencing primary key column, this method will decode
276    /// the primary key and put it into the batch.
277    pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
278        let mut mask = BooleanBuffer::new_set(input.num_rows());
279
280        // Run filter one by one and combine them result
281        // TODO(ruihang): run primary key filter first. It may short circuit other filters
282        for filter_ctx in &self.filters {
283            let filter = match filter_ctx.filter() {
284                MaybeFilter::Filter(f) => f,
285                // Column matches.
286                MaybeFilter::Matched => continue,
287                // Column doesn't match, filter the entire batch.
288                MaybeFilter::Pruned => return Ok(None),
289            };
290            let result = match filter_ctx.semantic_type() {
291                SemanticType::Tag => {
292                    let pk_values = if let Some(pk_values) = input.pk_values() {
293                        pk_values
294                    } else {
295                        input.set_pk_values(
296                            self.codec
297                                .decode(input.primary_key())
298                                .context(DecodeSnafu)?,
299                        );
300                        input.pk_values().unwrap()
301                    };
302                    let pk_value = match pk_values {
303                        CompositeValues::Dense(v) => {
304                            // Safety: this is a primary key
305                            let pk_index = self
306                                .read_format
307                                .metadata()
308                                .primary_key_index(filter_ctx.column_id())
309                                .unwrap();
310                            v[pk_index]
311                                .1
312                                .try_to_scalar_value(filter_ctx.data_type())
313                                .context(DataTypeMismatchSnafu)?
314                        }
315                        CompositeValues::Sparse(v) => {
316                            let v = v.get_or_null(filter_ctx.column_id());
317                            v.try_to_scalar_value(filter_ctx.data_type())
318                                .context(DataTypeMismatchSnafu)?
319                        }
320                    };
321                    if filter
322                        .evaluate_scalar(&pk_value)
323                        .context(RecordBatchSnafu)?
324                    {
325                        continue;
326                    } else {
327                        // PK not match means the entire batch is filtered out.
328                        return Ok(None);
329                    }
330                }
331                SemanticType::Field => {
332                    // Safety: Input is Batch so we are using primary key format.
333                    let Some(field_index) = self
334                        .read_format
335                        .as_primary_key()
336                        .unwrap()
337                        .field_index_by_id(filter_ctx.column_id())
338                    else {
339                        continue;
340                    };
341                    let field_col = &input.fields()[field_index].data;
342                    filter
343                        .evaluate_vector(field_col)
344                        .context(RecordBatchSnafu)?
345                }
346                SemanticType::Timestamp => filter
347                    .evaluate_vector(input.timestamps())
348                    .context(RecordBatchSnafu)?,
349            };
350
351            mask = mask.bitand(&result);
352        }
353
354        input.filter(&BooleanArray::from(mask).into())?;
355
356        Ok(Some(input))
357    }
358
359    /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
360    pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
361        let mut mask = BooleanBuffer::new_set(input.num_rows());
362
363        let flat_format = self
364            .read_format
365            .as_flat()
366            .context(crate::error::UnexpectedSnafu {
367                reason: "Expected flat format for precise_filter_flat",
368            })?;
369
370        // Run filter one by one and combine them result
371        for filter_ctx in &self.filters {
372            let filter = match filter_ctx.filter() {
373                MaybeFilter::Filter(f) => f,
374                // Column matches.
375                MaybeFilter::Matched => continue,
376                // Column doesn't match, filter the entire batch.
377                MaybeFilter::Pruned => return Ok(None),
378            };
379
380            // Get the column directly by its projected index
381            let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
382            if let Some(idx) = column_idx {
383                let column = &input.columns()[idx];
384                let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
385                mask = mask.bitand(&result);
386            } else {
387                // Column not found in projection, continue
388                continue;
389            }
390        }
391
392        let filtered_batch =
393            datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
394                .context(ComputeArrowSnafu)?;
395
396        if filtered_batch.num_rows() > 0 {
397            Ok(Some(filtered_batch))
398        } else {
399            Ok(None)
400        }
401    }
402}