mito2/sst/parquet/
file_range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and functions for reading ranges from a parquet file. A file range
16//! is usually a row group in a parquet file.
17
18use std::ops::BitAnd;
19use std::sync::Arc;
20
21use api::v1::{OpType, SemanticType};
22use common_telemetry::error;
23use datatypes::arrow::array::BooleanArray;
24use datatypes::arrow::buffer::BooleanBuffer;
25use parquet::arrow::arrow_reader::RowSelection;
26use snafu::{OptionExt, ResultExt};
27use store_api::storage::TimeSeriesRowSelector;
28
29use crate::error::{
30    DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
31};
32use crate::read::compat::CompatBatch;
33use crate::read::last_row::RowGroupLastRowCachedReader;
34use crate::read::prune::PruneReader;
35use crate::read::Batch;
36use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
37use crate::sst::file::FileHandle;
38use crate::sst::parquet::format::ReadFormat;
39use crate::sst::parquet::reader::{
40    MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
41};
42
43/// A range of a parquet SST. Now it is a row group.
44/// We can read different file ranges in parallel.
45#[derive(Clone)]
46pub struct FileRange {
47    /// Shared context.
48    context: FileRangeContextRef,
49    /// Index of the row group in the SST.
50    row_group_idx: usize,
51    /// Row selection for the row group. `None` means all rows.
52    row_selection: Option<RowSelection>,
53}
54
55impl FileRange {
56    /// Creates a new [FileRange].
57    pub(crate) fn new(
58        context: FileRangeContextRef,
59        row_group_idx: usize,
60        row_selection: Option<RowSelection>,
61    ) -> Self {
62        Self {
63            context,
64            row_group_idx,
65            row_selection,
66        }
67    }
68
69    /// Returns true if [FileRange] selects all rows in row group.
70    fn select_all(&self) -> bool {
71        let rows_in_group = self
72            .context
73            .reader_builder
74            .parquet_metadata()
75            .row_group(self.row_group_idx)
76            .num_rows();
77
78        let Some(row_selection) = &self.row_selection else {
79            return true;
80        };
81        row_selection.row_count() == rows_in_group as usize
82    }
83
84    /// Returns a reader to read the [FileRange].
85    pub(crate) async fn reader(
86        &self,
87        selector: Option<TimeSeriesRowSelector>,
88    ) -> Result<PruneReader> {
89        let parquet_reader = self
90            .context
91            .reader_builder
92            .build(self.row_group_idx, self.row_selection.clone())
93            .await?;
94
95        let use_last_row_reader = if selector
96            .map(|s| s == TimeSeriesRowSelector::LastRow)
97            .unwrap_or(false)
98        {
99            // Only use LastRowReader if row group does not contain DELETE
100            // and all rows are selected.
101            let put_only = !self
102                .context
103                .contains_delete(self.row_group_idx)
104                .inspect_err(|e| {
105                    error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
106                })
107                .unwrap_or(true);
108            put_only && self.select_all()
109        } else {
110            // No selector provided, use RowGroupReader
111            false
112        };
113
114        let prune_reader = if use_last_row_reader {
115            // Row group is PUT only, use LastRowReader to skip unnecessary rows.
116            let reader = RowGroupLastRowCachedReader::new(
117                self.file_handle().file_id(),
118                self.row_group_idx,
119                self.context.reader_builder.cache_strategy().clone(),
120                RowGroupReader::new(self.context.clone(), parquet_reader),
121            );
122            PruneReader::new_with_last_row_reader(self.context.clone(), reader)
123        } else {
124            // Row group contains DELETE, fallback to default reader.
125            PruneReader::new_with_row_group_reader(
126                self.context.clone(),
127                RowGroupReader::new(self.context.clone(), parquet_reader),
128            )
129        };
130
131        Ok(prune_reader)
132    }
133
134    /// Returns the helper to compat batches.
135    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
136        self.context.compat_batch()
137    }
138
139    /// Returns the file handle of the file range.
140    pub(crate) fn file_handle(&self) -> &FileHandle {
141        self.context.reader_builder.file_handle()
142    }
143}
144
145/// Context shared by ranges of the same parquet SST.
146pub(crate) struct FileRangeContext {
147    /// Row group reader builder for the file.
148    reader_builder: RowGroupReaderBuilder,
149    /// Base of the context.
150    base: RangeBase,
151}
152
153pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
154
155impl FileRangeContext {
156    /// Creates a new [FileRangeContext].
157    pub(crate) fn new(
158        reader_builder: RowGroupReaderBuilder,
159        filters: Vec<SimpleFilterContext>,
160        read_format: ReadFormat,
161        codec: Arc<dyn PrimaryKeyCodec>,
162    ) -> Self {
163        Self {
164            reader_builder,
165            base: RangeBase {
166                filters,
167                read_format,
168                codec,
169                compat_batch: None,
170            },
171        }
172    }
173
174    /// Returns the path of the file to read.
175    pub(crate) fn file_path(&self) -> &str {
176        self.reader_builder.file_path()
177    }
178
179    /// Returns filters pushed down.
180    pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
181        &self.base.filters
182    }
183
184    /// Returns the format helper.
185    pub(crate) fn read_format(&self) -> &ReadFormat {
186        &self.base.read_format
187    }
188
189    /// Returns the reader builder.
190    pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
191        &self.reader_builder
192    }
193
194    /// Returns the helper to compat batches.
195    pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
196        self.base.compat_batch.as_ref()
197    }
198
199    /// Sets the `CompatBatch` to the context.
200    pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
201        self.base.compat_batch = compat;
202    }
203
204    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
205    /// Return the filtered batch. If the entire batch is filtered out, return None.
206    pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
207        self.base.precise_filter(input)
208    }
209
210    //// Decodes parquet metadata and finds if row group contains delete op.
211    pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
212        let metadata = self.reader_builder.parquet_metadata();
213        let row_group_metadata = &metadata.row_groups()[row_group_index];
214
215        // safety: The last column of SST must be op_type
216        let column_metadata = &row_group_metadata.columns().last().unwrap();
217        let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
218            file_path: self.reader_builder.file_path(),
219        })?;
220        stats
221            .min_bytes_opt()
222            .context(StatsNotPresentSnafu {
223                file_path: self.reader_builder.file_path(),
224            })?
225            .try_into()
226            .map(i32::from_le_bytes)
227            .map(|min_op_type| min_op_type == OpType::Delete as i32)
228            .ok()
229            .context(DecodeStatsSnafu {
230                file_path: self.reader_builder.file_path(),
231            })
232    }
233}
234
235/// Common fields for a range to read and filter batches.
236pub(crate) struct RangeBase {
237    /// Filters pushed down.
238    pub(crate) filters: Vec<SimpleFilterContext>,
239    /// Helper to read the SST.
240    pub(crate) read_format: ReadFormat,
241    /// Decoder for primary keys
242    pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
243    /// Optional helper to compat batches.
244    pub(crate) compat_batch: Option<CompatBatch>,
245}
246
247impl RangeBase {
248    /// TRY THE BEST to perform pushed down predicate precisely on the input batch.
249    /// Return the filtered batch. If the entire batch is filtered out, return None.
250    ///
251    /// Supported filter expr type is defined in [SimpleFilterEvaluator].
252    ///
253    /// When a filter is referencing primary key column, this method will decode
254    /// the primary key and put it into the batch.
255    pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
256        let mut mask = BooleanBuffer::new_set(input.num_rows());
257
258        // Run filter one by one and combine them result
259        // TODO(ruihang): run primary key filter first. It may short circuit other filters
260        for filter_ctx in &self.filters {
261            let filter = match filter_ctx.filter() {
262                MaybeFilter::Filter(f) => f,
263                // Column matches.
264                MaybeFilter::Matched => continue,
265                // Column doesn't match, filter the entire batch.
266                MaybeFilter::Pruned => return Ok(None),
267            };
268            let result = match filter_ctx.semantic_type() {
269                SemanticType::Tag => {
270                    let pk_values = if let Some(pk_values) = input.pk_values() {
271                        pk_values
272                    } else {
273                        input.set_pk_values(self.codec.decode(input.primary_key())?);
274                        input.pk_values().unwrap()
275                    };
276                    let pk_value = match pk_values {
277                        CompositeValues::Dense(v) => {
278                            // Safety: this is a primary key
279                            let pk_index = self
280                                .read_format
281                                .metadata()
282                                .primary_key_index(filter_ctx.column_id())
283                                .unwrap();
284                            v[pk_index]
285                                .1
286                                .try_to_scalar_value(filter_ctx.data_type())
287                                .context(FieldTypeMismatchSnafu)?
288                        }
289                        CompositeValues::Sparse(v) => {
290                            let v = v.get_or_null(filter_ctx.column_id());
291                            v.try_to_scalar_value(filter_ctx.data_type())
292                                .context(FieldTypeMismatchSnafu)?
293                        }
294                    };
295                    if filter
296                        .evaluate_scalar(&pk_value)
297                        .context(FilterRecordBatchSnafu)?
298                    {
299                        continue;
300                    } else {
301                        // PK not match means the entire batch is filtered out.
302                        return Ok(None);
303                    }
304                }
305                SemanticType::Field => {
306                    let Some(field_index) =
307                        self.read_format.field_index_by_id(filter_ctx.column_id())
308                    else {
309                        continue;
310                    };
311                    let field_col = &input.fields()[field_index].data;
312                    filter
313                        .evaluate_vector(field_col)
314                        .context(FilterRecordBatchSnafu)?
315                }
316                SemanticType::Timestamp => filter
317                    .evaluate_vector(input.timestamps())
318                    .context(FilterRecordBatchSnafu)?,
319            };
320
321            mask = mask.bitand(&result);
322        }
323
324        input.filter(&BooleanArray::from(mask).into())?;
325
326        Ok(Some(input))
327    }
328}